import yaml import logging import threading import signal import time import datetime import requests import json from pnutlib import Pnut from appservice import app from models import * _shutdown = threading.Event() class ChannelMonitor(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.matrix_api_url = app.config['MATRIX_HOST'] + '/_matrix/client/r0' self.matrix_api_token = app.config['MATRIX_AS_TOKEN'] self.txId = 0 def generate_matrix_id(self, username): m_username = app.config['MATRIX_PNUT_PREFIX'] + username m_userid = "@" + app.config['MATRIX_PNUT_PREFIX'] + username + ":" + app.config['MATRIX_DOMAIN'] m_display = username + " (pnut)" return {'username': m_username, 'user_id': m_userid, 'displayname': m_display} def is_registered(self, user_id): url = self.matrix_api_url + '/profile/' + user_id r = requests.get(url) if r.status_code == 200: rdata = r.json() if 'displayname' in rdata: return rdata['displayname'] else: return False else: return False def create_matrix_user(self, username): url = self.matrix_api_url + '/register' params = { 'access_token': self.matrix_api_token } data = { 'type': 'm.login.application_service', 'user': username } r = requests.post(url, params=params, data=json.dumps(data)) if r.status_code == 200: logging.info('REGISTERED USER: ' + username) logging.info(r.text) def set_displayname(self, muser): url = self.matrix_api_url + '/profile/' + muser['user_id'] + '/displayname' params = { 'access_token': self.matrix_api_token, 'user_id': muser['user_id'] } data = { 'displayname': muser['displayname'] } headers = {'Content-Type': 'application/json'} r = requests.put(url, params=params, data=json.dumps(data), headers=headers) def join_room(self, user_id, roomid): url = self.matrix_api_url + '/join/' + roomid params = { 'access_token': self.matrix_api_token, 'user_id': user_id } r = requests.post(url, params=params) def send_message(self, roomid, msg): url = self.matrix_api_url + '/rooms/' + roomid +'/send/m.room.message' + '/' + str(self.txId) params = { 'access_token': self.matrix_api_token, 'user_id': msg['user']['user_id'], 'ts': msg['ts'] } data = { 'msgtype': 'm.text', 'body': msg['text'] } headers = {'Content-Type': 'application/json'} r = requests.put(url, params=params, data=json.dumps(data), headers=headers) if r.status_code == 200: eventid = r.json()['event_id'] el = MatrixMsgEvents(eventid, roomid, msg['msgid'], msg['puser'], msg['chan']) db.session.add(el) db.session.commit() def poll_channel(self, room): r = Pnut(app.config['MATRIX_PNUT_TOKEN']).get_channel_stream(room.pnut_chan, room.pnut_since) if r is not None and r.status_code == 200: rdata = r.json() pqueue = [] for post in rdata['data']: if post['user']['username'] == app.config['MATRIX_PNUT_USER']: continue if post['source']['id'] == app.config['PNUTCLIENT_ID']: continue if 'content' in post: user = self.generate_matrix_id(post['user']['username']) text = post['content']['text'] + "\n" dt = datetime.datetime.strptime(post["created_at"], "%Y-%m-%dT%H:%M:%SZ") ts = time.mktime(dt.timetuple()) msgid = post['id'] puser = post['user']['username'] # handle entities if 'entities' in post['content']: if 'links' in post['content']['entities']: for lnk in post['content']['entities']['links']: text += "\n" if 'title' in lnk: text += lnk['title'] + "\n" if 'link' in lnk: text += lnk['link'] + "\n" # handle raw data if 'raw' in post: for raw in post['raw']: text += "\n" if raw['type'] == 'io.pnut.core.oembed': if 'title' in raw['value']: text += raw['value']['title'] + "\n" if 'url' in raw['value']: text += raw['value']['url'] + "\n" pqueue.insert(0, {'user':user,'text':text,'ts':ts,'msgid':msgid,'puser':puser,'chan':room.pnut_chan}) if len(rdata["data"]) > 0: room.pnut_since = rdata["data"][0]["id"] db.session.commit() for item in pqueue: self.txId += 1 d = self.is_registered(item['user']['user_id']) if not d: self.create_matrix_user(item['user']['username']) if d != item['user']['displayname']: self.set_displayname(item['user']) self.join_room(item['user']['user_id'], room.room_id) time.sleep(1) self.send_message(room.room_id, item) def run(self): logging.info("-- Starting channel monitor --") app.app_context().push() rooms = MatrixRoom.query.all() self.txId = int(rooms[0].pnut_since) while not _shutdown.isSet(): rooms = MatrixRoom.query.all() for r in rooms: self.poll_channel(r) time.sleep(15) logging.info("-- Stopping channel monitor --") if __name__ == '__main__': with open("config.yaml", "rb") as config_file: config = yaml.load(config_file) logging.basicConfig(level=logging.INFO) app.config.update(config) monitor = ChannelMonitor() monitor.start() app.run(port=config['LISTEN_PORT']) _shutdown.set() logging.info("-- Shutdown in progress --")