pnut-matrix/pnut-bridge.py

251 lines
8.8 KiB
Python
Raw Permalink Normal View History

2017-03-05 02:14:32 +00:00
import yaml
import logging
import threading
import signal
import time
import datetime
2017-03-05 02:37:02 +00:00
import requests
2017-03-05 02:38:16 +00:00
import json
import pnutpy
2017-03-05 02:14:32 +00:00
from appservice import app
from models import *
_shutdown = threading.Event()
class ChannelMonitor(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
pnutpy.api.add_authorization_token(app.config['MATRIX_PNUT_TOKEN'])
2017-03-05 02:14:32 +00:00
self.matrix_api_url = app.config['MATRIX_HOST'] + '/_matrix/client/r0'
self.matrix_api_token = app.config['MATRIX_AS_TOKEN']
2017-03-05 02:35:51 +00:00
self.txId = 0
2017-03-05 02:14:32 +00:00
def generate_matrix_id(self, username):
m_username = app.config['MATRIX_PNUT_PREFIX'] + username
m_userid = "@" + app.config['MATRIX_PNUT_PREFIX'] + username + ":" + app.config['MATRIX_DOMAIN']
m_display = username + " (pnut)"
return {'username': m_username, 'user_id': m_userid, 'displayname': m_display}
def is_registered(self, user_id):
url = self.matrix_api_url + '/profile/' + user_id
r = requests.get(url)
if r.status_code == 200:
rdata = r.json()
if 'displayname' in rdata:
return rdata['displayname']
else:
return False
else:
return False
def create_matrix_user(self, username):
url = self.matrix_api_url + '/register'
params = {
'access_token': self.matrix_api_token
}
data = {
'type': 'm.login.application_service',
'user': username
}
r = requests.post(url, params=params, data=json.dumps(data))
if r.status_code == 200:
logging.info('REGISTERED USER: ' + username)
logging.info(r.text)
2017-03-05 02:14:32 +00:00
def set_displayname(self, muser):
url = self.matrix_api_url + '/profile/' + muser['user_id'] + '/displayname'
params = {
'access_token': self.matrix_api_token,
'user_id': muser['user_id']
}
data = {
'displayname': muser['displayname']
}
headers = {'Content-Type': 'application/json'}
r = requests.put(url, params=params, data=json.dumps(data), headers=headers)
def join_room(self, user_id, roomid):
url = self.matrix_api_url + '/join/' + roomid
params = {
'access_token': self.matrix_api_token,
'user_id': user_id
}
r = requests.post(url, params=params)
2017-05-25 03:50:16 +00:00
if r.status_code == 403:
self.invite_room(user_id, roomid)
requests.post(url, params=params)
def invite_room(self, user_id, roomid):
url = self.matrix_api_url + '/rooms/' + roomid + "/invite"
2017-05-25 04:05:55 +00:00
headers = {"Content-Type": "application/json"}
2017-05-25 03:50:16 +00:00
params = {
'access_token': self.matrix_api_token,
2017-05-25 04:05:55 +00:00
}
body = {
2017-05-25 03:50:16 +00:00
'user_id': user_id
}
2017-05-25 04:05:55 +00:00
r = requests.post(url, headers=headers, params=params, data=json.dumps(body))
2017-03-05 02:14:32 +00:00
def send_message(self, roomid, msg):
2017-03-05 02:35:51 +00:00
url = self.matrix_api_url + '/rooms/' + roomid +'/send/m.room.message' + '/' + str(self.txId)
#logging.debug('debug: ' + msg['ts'])
#logging.debug('debug: ')
#logging.debug(msg['ts'])
# 'ts': msg['ts']
2017-03-05 02:14:32 +00:00
params = {
'access_token': self.matrix_api_token,
'user_id': msg['user']['user_id']
2017-03-05 02:14:32 +00:00
}
data = {
'msgtype': 'm.text',
'body': msg['text']
}
headers = {'Content-Type': 'application/json'}
r = requests.put(url, params=params, data=json.dumps(data), headers=headers)
if r.status_code == 200:
eventid = r.json()['event_id']
el = MatrixMsgEvents(eventid, roomid, msg['msgid'], msg['puser'], msg['chan'])
db.session.add(el)
db.session.commit()
else:
logging.debug('error: ' + str(r.status_code))
logging.debug(r.text)
2017-03-05 02:14:32 +00:00
def delete_msgs(self, room, ids):
for i in ids:
r_event = MatrixMsgEvents.query.filter_by(pnut_msgid=i,deleted=False).first()
logging.debug(r_event)
if r_event:
logging.debug(r_event.pnut_msgid)
logging.debug(r_event.event_id)
logging.debug(r_event.deleted)
self.redact_event(room, r_event)
def redact_event(self, room_id, event):
url = self.matrix_api_url + "/rooms/" + room_id + "/redact/" + event.event_id + "/" + str(self.txId)
headers = {"Content-Type": "application/json"}
params = {'access_token': self.matrix_api_token}
try:
event.deleted = True
db.session.commit()
r = requests.put(url, params=params, data="{}", headers=headers)
logging.debug(r.text)
except Exception as e:
logging.exception('redact_event')
def update_marker(self, chan, msgid):
try:
response = pnutpy.api.request_json('POST', '/markers', data=[{'name': 'channel:' + chan, 'id': msgid}])
logging.debug(response)
except Exception as e:
logging.exception('update_marker')
2017-03-05 02:14:32 +00:00
def poll_channel(self, room):
try:
messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id=room.pnut_since, include_raw=1)
# messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id='last_read', include_raw=1)
logging.debug(meta)
if 'deleted_ids' in meta:
self.delete_msgs(room.room_id, meta['deleted_ids'])
except pnutpy.errors.PnutRateLimitAPIException:
logging.warning('*** Rate limit error while trying to fetch messages! Waiting to retry. ***')
time.sleep(30)
return
except Exception as e:
logging.warning('*** An error occured while trying to fetch messages! Waiting to retry. ***')
logging.exception('poll_channel')
time.sleep(30)
return
pqueue = []
for msg in messages:
# bypass messages posted by the bridge to avoid duplicates and loops
if msg.user.username == app.config['MATRIX_PNUT_USER']:
continue
if msg.source.id == app.config['PNUTCLIENT_ID']:
continue
if 'content' in msg:
user = self.generate_matrix_id(msg.user.username)
text = msg.content.text + "\n"
# dt = datetime.datetime.strptime(msg.created_at, "%Y-%m-%dT%H:%M:%SZ")
dt = msg.created_at
ts = int(time.mktime(dt.timetuple()))
for lnk in msg.content.entities.links:
text += "\n"
if 'title' in lnk:
text += lnk.title + "\n"
if 'link' in lnk:
text += lnk.link + "\n"
for raw in msg.raw:
if raw.type == 'io.pnut.core.oembed':
if 'title' in raw.value:
text += raw.value.title + "\n"
if 'url' in raw.value:
text += raw.value.url + "\n"
# queue the message in reverse order (because of how pnut returns the messages)
pqueue.insert(0, {'user':user,'text':text,'ts':ts,'msgid':msg.id,'puser':msg.user.username,'chan':room.pnut_chan})
# update the last message id
if len(messages) > 0:
room.pnut_since = messages[0].id
db.session.commit()
# update the stream marker
if 'max_id' in meta:
self.update_marker(room.pnut_chan, meta.max_id)
# empty the queue to the matrix room
for item in pqueue:
self.txId += 1
d = self.is_registered(item['user']['user_id'])
if not d:
self.create_matrix_user(item['user']['username'])
if d != item['user']['displayname']:
self.set_displayname(item['user'])
self.join_room(item['user']['user_id'], room.room_id)
time.sleep(1)
self.send_message(room.room_id, item)
2017-03-05 02:14:32 +00:00
def run(self):
logging.info("-- Starting channel monitor --")
app.app_context().push()
rooms = MatrixRoom2.query.all()
2018-03-25 01:50:27 +00:00
if rooms:
self.txId = int(rooms[0].pnut_since)
2017-03-05 02:14:32 +00:00
while not _shutdown.isSet():
rooms = MatrixRoom2.query.all()
2017-03-05 02:14:32 +00:00
for r in rooms:
self.poll_channel(r)
2018-03-25 19:26:18 +00:00
time.sleep(.5)
time.sleep(3)
2017-03-05 02:14:32 +00:00
logging.info("-- Stopping channel monitor --")
if __name__ == '__main__':
with open("config.yaml", "rb") as config_file:
config = yaml.load(config_file)
2018-03-25 18:37:18 +00:00
logging.basicConfig(level=logging.INFO)
2017-03-05 02:14:32 +00:00
app.config.update(config)
monitor = ChannelMonitor()
monitor.start()
app.run(port=config['LISTEN_PORT'])
_shutdown.set()
logging.info("-- Shutdown in progress --")