import websocket import threading import time import logging import logging.config import yaml import json import pnutpy import requests import magic import argparse import os import re from matrix_client.api import MatrixHttpApi from matrix_client.api import MatrixError, MatrixRequestError from models import Avatars, Rooms, Events, DirectRooms, Users from database import db_session, init_db from sqlalchemy import and_ from appservice import app logger = logging.getLogger() _shutdown = threading.Event() _reconnect = threading.Event() class MLogFilter(logging.Filter): ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(\s.*)$") ACCESS_TOKEN_RE2 = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$") def filter(self, record): if record.name == "werkzeug" and len(record.args) > 0: redacted_uri = MLogFilter.ACCESS_TOKEN_RE.sub(r"\1\3", record.args[0]) record.args = (redacted_uri, ) + record.args[1:] elif record.name == "urllib3.connectionpool" and len(record.args) > 3: redacted_uri = MLogFilter.ACCESS_TOKEN_RE2.sub(r"\1\3", record.args[4]) record.args = record.args[:4] + (redacted_uri,) + record.args[5:] return True def new_message(msg, meta): logger.debug("channel: " + msg.channel_id) logger.debug("username: " + msg.user.username) if 'name' in msg.user: logger.debug("name: " + msg.user.name) logger.debug("text: " + msg.content.text) # ignore messages posted by the bridge if msg.user.username == config['MATRIX_PNUT_USER']: return if msg.source.id == config['PNUTCLIENT_ID']: return if meta['channel_type'] == 'io.pnut.core.chat': room = Rooms.query.filter(Rooms.pnut_chan == msg.channel_id).one_or_none() elif meta['channel_type'] == 'io.pnut.core.pm': room = DirectRooms.query.filter(DirectRooms.pnut_chan == msg.channel_id).one_or_none() if room is None: # Do do an invite from the bridge user? logger.debug('new invite?') # create room and included matrix recpient # subscribed_user_ids from meta logger.debug(meta['subscribed_user_ids']) pnut_user = matrix_id_from_pnut(msg.user.username) profile = get_matrix_profile(pnut_user) if not profile: new_matrix_user(msg.user.username) invitees=[] for pm_user in meta['subscribed_user_ids']: user = Users.query.filter(Users.pnut_user_id == pm_user).one_or_none() if int(pm_user) == msg.user.id: continue if user is not None: invitees.append(user.matrix_id) if len(invitees) > 0: room = new_room(pnut_user, invitees, msg.channel_id) else: room = None logger.debug(room) if room is None: logger.debug('-not_mapped-') return matrix_id = matrix_id_from_pnut(msg.user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) profile = get_matrix_profile(matrix_id) if not profile: new_matrix_user(msg.user.username) logger.debug('-new_user-') profile = {'displayname': None} if profile['displayname'] != matrix_display_from_pnut(msg.user): set_matrix_display(msg.user) logger.debug('-set_display-') avatar = Avatars.query.filter(Avatars.pnut_user == msg.user.username).one_or_none() if avatar is None or avatar.avatar != msg.user.content.avatar_image.url: set_matrix_avatar(msg.user) logger.debug('-set_avatar-') # members = matrix_api.get_room_members(room.room_id) # logger.debug(members) # join_room(room.room_id, config['MATRIX_AS_ID']) # TODO: sort out room invite and join logic join_room(room.room_id, matrix_id) if 'content' in msg: text = msg.content.text + "\n" ts = int(time.time()) * 1000 lnktext = "" for link in msg.content.entities.links: if 'title' in link: lnktext += link.title + "\n" if 'url' in link: lnktext += link.url + "\n" if len(lnktext) > 0: text += "\n" + lnktext r = matrix_api.send_message(room.room_id, text, timestamp=ts) event = Events( event_id=r['event_id'], room_id=room.room_id, pnut_msg_id=msg.id, pnut_user_id=msg.user.id, pnut_chan_id=msg.channel_id, deleted=False) db_session.add(event) db_session.commit() if 'raw' in msg: logger.debug('-handle media uploads-') new_media(room.room_id, msg) def new_media(room_id, msg): matrix_id = matrix_id_from_pnut(msg.user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) ts = int(time.time()) * 1000 if 'io.pnut.core.oembed' in msg.raw: for oembed in msg.raw['io.pnut.core.oembed']: info = {} if oembed.type == 'photo': msgtype = 'm.image' dl_url = oembed.url info['h'] = oembed.height info['w'] = oembed.width elif oembed.type == 'audio': logger.debug("* recieved audio attachment") continue elif oembed.type == 'video': logger.debug("* recieved video attachment") continue elif oembed.type == 'html5video': logger.debug("* recieved html5 video attachment") continue elif oembed.type == 'rich': logger.debug("* recieved video attachment") continue else: logger.debug("* recieved unknown attachment") continue dl = requests.get(dl_url, stream=True) dl.raise_for_status() with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m: info['mimetype'] = m.id_buffer(dl.content) info['size'] = len(dl.content) ul = matrix_api.media_upload(dl.content, info['mimetype']) if 'title' in oembed: title = oembed.title else: title = "" r = matrix_api.send_content(room_id, ul['content_uri'], title, msgtype, extra_information=info, timestamp=ts) event = Events( event_id=r['event_id'], room_id=room_id, pnut_msg_id=msg.id, pnut_user_id=msg.user.id, pnut_chan_id=msg.channel_id, deleted=False) db_session.add(event) db_session.commit() def delete_message(msg): matrix_id = matrix_id_from_pnut(msg.user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) events = Events.query.filter(and_(Events.pnut_msg_id == msg.id, Events.deleted == False)).all() for event in events: matrix_api.redact_event(event.room_id, event.event_id) event.deleted = True db_session.commit() def matrix_id_from_pnut(username): return "@" + config['MATRIX_PNUT_PREFIX'] + username + ":" + config['MATRIX_DOMAIN'] def matrix_display_from_pnut(user): if 'name' in user: display = user.name + " (@" + user.username + ")" else: display = "@" + user.username return display # return user.username + " (pnut)" def get_matrix_profile(matrix_id): url = matrix_url + '/profile/' + matrix_id r = requests.get(url) if r.status_code == 200: return r.json() else: return None def set_matrix_display(user): matrix_id = matrix_id_from_pnut(user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) matrix_api.set_display_name(matrix_id, matrix_display_from_pnut(user)) def set_matrix_avatar(user): matrix_id = matrix_id_from_pnut(user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) dl = requests.get(user.content.avatar_image.url, stream=True) dl.raise_for_status() with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m: mtype = m.id_buffer(dl.content) ul = matrix_api.media_upload(dl.content, mtype) try: matrix_api.set_avatar_url(matrix_id, ul['content_uri']) avatar = Avatars.query.filter(Avatars.pnut_user == user.username).one_or_none() if avatar is None: avatar = Avatars(pnut_user=user.username, avatar=user.content.avatar_image.url) db_session.add(avatar) else: avatar.avatar = user.content.avatar_image.url db_session.commit() except MatrixRequestError: logger.exception('failed to set user avatar') def new_matrix_user(username): matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN']) data = { 'type': 'm.login.application_service', 'username': config['MATRIX_PNUT_PREFIX'] + username } matrix_api.register(content=data) def join_room(room_id, matrix_id): matrix_api_as = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN']) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) try: matrix_api.join_room(room_id) except MatrixRequestError as e: if e.code == 403: matrix_api_as.invite_user(room_id, matrix_id) matrix_api.join_room(room_id) else: logger.exception('failed to join room') logger.debug('-room_join-') def new_room(pnut_user, invitees, chan): dr = None matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=pnut_user) url = matrix_url + '/createRoom' params = {"access_token": config['MATRIX_AS_TOKEN'], "user_id": pnut_user} content = {"visibility": "private", "is_direct": True, "invite": invitees} headers = {"Content-Type": "application/json"} r = requests.post(url, headers=headers, params=params, data=json.dumps(content)) response = r.json() for bridge_user in invitees: dr = DirectRooms(room_id=response['room_id'], bridge_user=pnut_user, pnut_chan=chan) logger.debug(dr) db_session.add(dr) db_session.commit() return dr def on_message(ws, message): # logger.debug("on_message: " + message) msg = json.loads(message) logger.debug(msg['meta']) if 'data' in msg: if 'channel_type' in msg['meta']: if msg['meta']['channel_type'] not in ['io.pnut.core.chat', 'io.pnut.core.pm']: return for d_item in msg['data']: pmsg = pnutpy.models.Message.from_response_data(d_item) if 'is_deleted' in msg['meta']: if msg['meta']['is_deleted']: logger.debug("message: delete") delete_message(pmsg) else: new_message(pmsg, msg['meta']) def on_error(ws, error): logger.error("on_error: !!! ERROR !!!") logger.error(error) def on_close(ws): logger.debug("on_close: ### CLOSED ###") def on_open(ws): def run(*args): while not _shutdown.isSet() and not _reconnect.isSet(): time.sleep(3) try: ws.send(".") except websocket._exceptions.WebSocketConnectionClosedException: logger.debug('websocket closed exception caught...') _reconnect.set() time.sleep(1) logger.debug("*** terminate thread ***") t = threading.Thread(target=run) t.start() def wsthreader(threadfunc): def wrapper(): while not _shutdown.isSet(): _reconnect.clear() logger.debug('threadfunc start...') running = threadfunc() logger.debug('threadfunc end...') if running: time.sleep(5) else: _shutdown.set() logger.debug('*** thread stopped ***') return wrapper if __name__ == '__main__': a_parser = argparse.ArgumentParser() a_parser.add_argument( '-d', action='store_true', dest='debug', help="debug logging" ) # TODO: solve the database.py problem and enable this # a_parser.add_argument( # '-c', '--config', default="config.yaml", # help="configuration file" # ) args = a_parser.parse_args() configyaml = os.environ.get("CONFIG_FILE") with open(configyaml, "rb") as config_file: config = yaml.load(config_file, Loader=yaml.SafeLoader) # websocket.enableTrace(True) logging.config.dictConfig(config['logging']) redact_filter = MLogFilter() logging.getLogger("werkzeug").addFilter(redact_filter) logging.getLogger("urllib3.connectionpool").addFilter(redact_filter) ws_url = 'wss://stream.pnut.io/v1/app?access_token=' ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY'] ws_url += '&include_raw=1' matrix_url = config['MATRIX_HOST'] + '/_matrix/client/r0' # setup the database connection init_db() # setup the websocket connection ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open wst = threading.Thread(target=wsthreader(ws.run_forever)) wst.start() # setup the matrix app service if config['MATRIX_ADMIN_ROOM']: logger.debug("- sould join admin room -") join_room(config['MATRIX_ADMIN_ROOM'], config['MATRIX_AS_ID']) app.config.update(config) app.run(host=config['LISTEN_HOST'], port=config['LISTEN_PORT']) logger.info('!! shutdown initiated !!') _shutdown.set() ws.close() time.sleep(2)