import yaml import logging import threading import signal import time import datetime import requests import json import pnutpy from appservice import app from models import * _shutdown = threading.Event() class ChannelMonitor(threading.Thread): def __init__(self): threading.Thread.__init__(self) pnutpy.api.add_authorization_token(app.config['MATRIX_PNUT_TOKEN']) self.matrix_api_url = app.config['MATRIX_HOST'] + '/_matrix/client/r0' self.matrix_api_token = app.config['MATRIX_AS_TOKEN'] self.txId = 0 def generate_matrix_id(self, username): m_username = app.config['MATRIX_PNUT_PREFIX'] + username m_userid = "@" + app.config['MATRIX_PNUT_PREFIX'] + username + ":" + app.config['MATRIX_DOMAIN'] m_display = username + " (pnut)" return {'username': m_username, 'user_id': m_userid, 'displayname': m_display} def is_registered(self, user_id): url = self.matrix_api_url + '/profile/' + user_id r = requests.get(url) if r.status_code == 200: rdata = r.json() if 'displayname' in rdata: return rdata['displayname'] else: return False else: return False def create_matrix_user(self, username): url = self.matrix_api_url + '/register' params = { 'access_token': self.matrix_api_token } data = { 'type': 'm.login.application_service', 'user': username } r = requests.post(url, params=params, data=json.dumps(data)) if r.status_code == 200: logging.info('REGISTERED USER: ' + username) logging.info(r.text) def set_displayname(self, muser): url = self.matrix_api_url + '/profile/' + muser['user_id'] + '/displayname' params = { 'access_token': self.matrix_api_token, 'user_id': muser['user_id'] } data = { 'displayname': muser['displayname'] } headers = {'Content-Type': 'application/json'} r = requests.put(url, params=params, data=json.dumps(data), headers=headers) def join_room(self, user_id, roomid): url = self.matrix_api_url + '/join/' + roomid params = { 'access_token': self.matrix_api_token, 'user_id': user_id } r = requests.post(url, params=params) if r.status_code == 403: self.invite_room(user_id, roomid) requests.post(url, params=params) def invite_room(self, user_id, roomid): url = self.matrix_api_url + '/rooms/' + roomid + "/invite" headers = {"Content-Type": "application/json"} params = { 'access_token': self.matrix_api_token, } body = { 'user_id': user_id } r = requests.post(url, headers=headers, params=params, data=json.dumps(body)) def send_message(self, roomid, msg): url = self.matrix_api_url + '/rooms/' + roomid +'/send/m.room.message' + '/' + str(self.txId) #logging.debug('debug: ' + msg['ts']) #logging.debug('debug: ') #logging.debug(msg['ts']) # 'ts': msg['ts'] params = { 'access_token': self.matrix_api_token, 'user_id': msg['user']['user_id'] } data = { 'msgtype': 'm.text', 'body': msg['text'] } headers = {'Content-Type': 'application/json'} r = requests.put(url, params=params, data=json.dumps(data), headers=headers) if r.status_code == 200: eventid = r.json()['event_id'] el = MatrixMsgEvents(eventid, roomid, msg['msgid'], msg['puser'], msg['chan']) db.session.add(el) db.session.commit() else: logging.debug('error: ' + str(r.status_code)) logging.debug(r.text) def delete_msgs(self, room, ids): for i in ids: r_event = MatrixMsgEvents.query.filter_by(pnut_msgid=i,deleted=False).first() logging.debug(r_event) if r_event: logging.debug(r_event.pnut_msgid) logging.debug(r_event.event_id) logging.debug(r_event.deleted) self.redact_event(room, r_event) def redact_event(self, room_id, event): url = self.matrix_api_url + "/rooms/" + room_id + "/redact/" + event.event_id + "/" + str(self.txId) headers = {"Content-Type": "application/json"} params = {'access_token': self.matrix_api_token} try: event.deleted = True db.session.commit() r = requests.put(url, params=params, data="{}", headers=headers) logging.debug(r.text) except Exception as e: logging.exception('redact_event') def update_marker(self, chan, msgid): try: response = pnutpy.api.request_json('POST', '/markers', data=[{'name': 'channel:' + chan, 'id': msgid}]) logging.debug(response) except Exception as e: logging.exception('update_marker') def poll_channel(self, room): try: # messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id=room.pnut_since, include_raw=1) messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id='last_read', include_raw=1) logging.debug(meta) if 'deleted_ids' in meta: self.delete_msgs(room.room_id, meta['deleted_ids']) except pnutpy.errors.PnutRateLimitAPIException: logging.warning('*** Rate limit error while trying to fetch messages! Waiting to retry. ***') time.sleep(30) return except Exception as e: logging.warning('*** An error occured while trying to fetch messages! Waiting to retry. ***') logging.exception('poll_channel') time.sleep(30) return pqueue = [] for msg in messages: # bypass messages posted by the bridge to avoid duplicates and loops if msg.user.username == app.config['MATRIX_PNUT_USER']: continue if msg.source.id == app.config['PNUTCLIENT_ID']: continue if 'content' in msg: user = self.generate_matrix_id(msg.user.username) text = msg.content.text + "\n" # dt = datetime.datetime.strptime(msg.created_at, "%Y-%m-%dT%H:%M:%SZ") dt = msg.created_at ts = int(time.mktime(dt.timetuple())) for lnk in msg.content.entities.links: text += "\n" if 'title' in lnk: text += lnk.title + "\n" if 'link' in lnk: text += lnk.link + "\n" for raw in msg.raw: if raw.type == 'io.pnut.core.oembed': if 'title' in raw.value: text += raw.value.title + "\n" if 'url' in raw.value: text += raw.value.url + "\n" # queue the message in reverse order (because of how pnut returns the messages) pqueue.insert(0, {'user':user,'text':text,'ts':ts,'msgid':msg.id,'puser':msg.user.username,'chan':room.pnut_chan}) # update the last message id if len(messages) > 0: room.pnut_since = messages[0].id db.session.commit() # update the stream marker if 'max_id' in meta: self.update_marker(room.pnut_chan, meta.max_id) # empty the queue to the matrix room for item in pqueue: self.txId += 1 d = self.is_registered(item['user']['user_id']) if not d: self.create_matrix_user(item['user']['username']) if d != item['user']['displayname']: self.set_displayname(item['user']) self.join_room(item['user']['user_id'], room.room_id) time.sleep(1) self.send_message(room.room_id, item) def run(self): logging.info("-- Starting channel monitor --") app.app_context().push() rooms = MatrixRoom2.query.all() if rooms: self.txId = int(rooms[0].pnut_since) while not _shutdown.isSet(): rooms = MatrixRoom2.query.all() for r in rooms: self.poll_channel(r) time.sleep(.5) time.sleep(3) logging.info("-- Stopping channel monitor --") if __name__ == '__main__': with open("config.yaml", "rb") as config_file: config = yaml.load(config_file) logging.basicConfig(level=logging.INFO) app.config.update(config) monitor = ChannelMonitor() monitor.start() app.run(port=config['LISTEN_PORT']) _shutdown.set() logging.info("-- Shutdown in progress --")