From ffc4af4b9bc6bb56dfb57a04eb8cb3fd53086c59 Mon Sep 17 00:00:00 2001 From: Morgan McMillian Date: Sat, 4 Mar 2017 18:14:32 -0800 Subject: [PATCH] channel monitor --- appservice.py | 8 --- pnut-bridge.py | 181 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 8 deletions(-) create mode 100644 pnut-bridge.py diff --git a/appservice.py b/appservice.py index 59cf853..f98f668 100644 --- a/appservice.py +++ b/appservice.py @@ -12,14 +12,6 @@ db.init_app(app) txId = 0 -@app.route("/testme/") -def on_testme(hello): - logging.debug(hello) - user = MatrixUser.query.filter_by(matrix_id=hello).first() - logging.debug(user) - logging.debug(app.config['MATRIX_PNUT_PREFIX']) - return jsonify({}) - @app.route("/transactions/", methods=["PUT"]) def on_receive_events(transaction): global txId diff --git a/pnut-bridge.py b/pnut-bridge.py new file mode 100644 index 0000000..0308d05 --- /dev/null +++ b/pnut-bridge.py @@ -0,0 +1,181 @@ +import yaml +import logging +import threading +import signal +import time +import datetime + +from pnutlib import Pnut +from appservice import app +from models import * + +_shutdown = threading.Event() + +class ChannelMonitor(threading.Thread): + + txId = 0 + + def __init__(self): + threading.Thread.__init__(self) + self.matrix_api_url = app.config['MATRIX_HOST'] + '/_matrix/client/r0' + self.matrix_api_token = app.config['MATRIX_AS_TOKEN'] + + def generate_matrix_id(self, username): + m_username = app.config['MATRIX_PNUT_PREFIX'] + username + m_userid = "@" + app.config['MATRIX_PNUT_PREFIX'] + username + ":" + app.config['MATRIX_DOMAIN'] + m_display = username + " (pnut)" + return {'username': m_username, 'user_id': m_userid, 'displayname': m_display} + + def is_registered(self, user_id): + url = self.matrix_api_url + '/profile/' + user_id + r = requests.get(url) + if r.status_code == 200: + rdata = r.json() + if 'displayname' in rdata: + return rdata['displayname'] + else: + return False + else: + return False + + def create_matrix_user(self, username): + url = self.matrix_api_url + '/register' + params = { + 'access_token': self.matrix_api_token + } + data = { + 'type': 'm.login.application_service', + 'user': username + } + r = requests.post(url, params=params, data=json.dumps(data)) + if r.status_code == 200: + logging.info('REGISTERED USER: ' + username) + logging.debug(r.text) + + def set_displayname(self, muser): + url = self.matrix_api_url + '/profile/' + muser['user_id'] + '/displayname' + params = { + 'access_token': self.matrix_api_token, + 'user_id': muser['user_id'] + } + data = { + 'displayname': muser['displayname'] + } + headers = {'Content-Type': 'application/json'} + r = requests.put(url, params=params, data=json.dumps(data), headers=headers) + + def join_room(self, user_id, roomid): + url = self.matrix_api_url + '/join/' + roomid + params = { + 'access_token': self.matrix_api_token, + 'user_id': user_id + } + r = requests.post(url, params=params) + + def send_message(self, roomid, msg): + url = self.matrix_api_url + '/rooms/' + roomid +'/send/m.room.message' + '/' + str(txId) + params = { + 'access_token': self.matrix_api_token, + 'user_id': msg['user']['user_id'], + 'ts': msg['ts'] + } + data = { + 'msgtype': 'm.text', + 'body': msg['text'] + } + headers = {'Content-Type': 'application/json'} + r = requests.put(url, params=params, data=json.dumps(data), headers=headers) + if r.status_code == 200: + eventid = r.json()['event_id'] + el = MatrixMsgEvents(eventid, roomid, msg['msgid'], msg['puser'], msg['chan']) + db.session.add(el) + db.session.commit() + + def poll_channel(self, room): + r = Pnut().get_channel_stream(room.pnut_chan, room.pnut_since) + if r is not None and r.status_code == 200: + rdata = r.json() + pqueue = [] + + for post in rdata['data']: + if post['user']['username'] == app.config['MATRIX_PNUT_USER']: + continue + if post['source']['id'] == app.config['PNUTCLIENT_ID']: + continue + + if 'content' in post: + user = self.generate_matrix_id(post['user']['username']) + text = post['content']['text'] + "\n" + dt = datetime.datetime.strptime(post["created_at"], "%Y-%m-%dT%H:%M:%SZ") + ts = time.mktime(dt.timetuple()) + msgid = post['id'] + puser = post['user']['username'] + + # handle entities + if 'entities' in post['content']: + if 'links' in post['content']['entities']: + for lnk in post['content']['entities']['links']: + text += "\n" + if 'title' in lnk: + text += lnk['title'] + "\n" + if 'link' in lnk: + text += lnk['link'] + "\n" + + # handle raw data + if 'raw' in post: + for raw in post['raw']: + text += "\n" + if raw['type'] == 'io.pnut.core.oembed': + if 'title' in raw['value']: + text += raw['value']['title'] + "\n" + if 'url' in raw['value']: + text += raw['value']['url'] + "\n" + + pqueue.insert(0, + {'user':user,'text':text,'ts':ts,'msgid':msgid,'puser':puser,'chan':room.pnut_chan}) + + if len(rdata["data"]) > 0: + room.pnut_since = rdata["data"][0]["id"] + db.session.commit() + + for item in pqueue: + txId += 1 + + d = self.is_registered(item['user']['user_id']) + if not d: + self.create_matrix_user(item['user']['username']) + + if d != item['user']['displayname']: + self.set_displayname(item['user']) + + self.join_room(item['user']['user_id'], room.room_id) + time.sleep(1) + + self.send_message(room.room_id, item) + + + def run(self): + logging.info("-- Starting channel monitor --") + app.app_context().push() + while not _shutdown.isSet(): + rooms = MatrixRoom.query.all() + for r in rooms: + self.poll_channel(r) + time.sleep(15) + logging.info("-- Stopping channel monitor --") + +if __name__ == '__main__': + + with open("config.yaml", "rb") as config_file: + config = yaml.load(config_file) + + logging.basicConfig(level=logging.DEBUG) + + app.config.update(config) + + monitor = ChannelMonitor() + monitor.start() + app.run(port=config['LISTEN_PORT']) + + _shutdown.set() + logging.info("-- Shutdown in progress --") \ No newline at end of file