import websocket import threading import time import logging import yaml import json import pnutpy import requests import magic from matrix_client.api import MatrixHttpApi from matrix_client.api import MatrixError, MatrixRequestError from models import Avatars, Rooms, Events from database import db_session, init_db from sqlalchemy import and_ from appservice import app logger = logging.getLogger() _shutdown = threading.Event() def new_message(msg): logger.debug("channel: " + msg.channel_id) logger.debug("username: " + msg.user.username) if 'name' in msg.user: logger.debug("name: " + msg.user.name) logger.debug("text: " + msg.content.text) # ignore messages posted by the bridge if msg.user.username == config['MATRIX_PNUT_USER']: return if msg.source.id == config['PNUTCLIENT_ID']: return room = Rooms.query.filter(Rooms.pnut_chan == msg.channel_id).one_or_none() logger.debug(room) if room is None: logger.debug('-not_mapped-') return matrix_id = matrix_id_from_pnut(msg.user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) profile = get_matrix_profile(matrix_id) if not profile: new_matrix_user(msg.user.username) logger.debug('-new_user-') profile = {'displayname': None} if profile['displayname'] != matrix_display_from_pnut(msg.user): set_matrix_display(msg.user) logger.debug('-set_display-') avatar = Avatars.query.filter(Avatars.pnut_user == msg.user.username).one_or_none() if avatar is None or avatar.avatar != msg.user.content.avatar_image.link: set_matrix_avatar(msg.user) logger.debug('-set_avatar-') # members = matrix_api.get_room_members(room.room_id) # logger.debug(members) # join_room(room.room_id, config['MATRIX_AS_ID']) # TODO: sort out room invite and join logic join_room(room.room_id, matrix_id) if 'content' in msg: text = msg.content.text + "\n" ts = int(msg.created_at.strftime('%s')) * 1000 lnktext = "" for link in msg.content.entities.links: if 'title' in link: lnktext += link.title + "\n" if 'link' in link: lnktext += link.link + "\n" if len(lnktext) > 0: text += "\n" + lnktext r = matrix_api.send_message(room.room_id, text, timestamp=ts) event = Events( event_id=r['event_id'], room_id=room.room_id, pnut_msg_id=msg.id, pnut_user_id=msg.user.id, pnut_chan_id=msg.channel_id, deleted=False) db_session.add(event) db_session.commit() if len(msg.raw) > 0: logger.debug('-handle media uploads-') new_media(room.room_id, msg) def new_media(room_id, msg): matrix_id = matrix_id_from_pnut(msg.user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) ts = int(msg.created_at.strftime('%s')) * 1000 for item in msg.raw: if item.type == 'io.pnut.core.oembed' and 'url' in item.value: dl = requests.get(item.value.url, stream=True) dl.raise_for_status() with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m: mtype = m.id_buffer(dl.content) info = {'mimetype': mtype} ul = matrix_api.media_upload(dl.content, mtype) if item.value.type == 'photo': msgtype = 'm.image' info['h'] = item.value.height info['w'] = item.value.width info['size'] = len(dl.content) elif item.value.type == 'video' or item.value.type == 'html5video': msgtype = 'm.video' info['h'] = item.value.height info['w'] = item.value.width info['size'] = len(dl.content) elif item.value.type == 'audio': msgtype = 'm.audio' info['duration'] = int(item.value.duration) * 1000 info['size'] = len(dl.content) else: msgtype = 'm.file' info['size'] = len(dl.content) r = matrix_api.send_content(room_id, ul['content_uri'], item.value.title, msgtype, extra_information=info, timestamp=ts) event = Events( event_id=r['event_id'], room_id=room_id, pnut_msg_id=msg.id, pnut_user_id=msg.user.id, pnut_chan_id=msg.channel_id, deleted=False) db_session.add(event) db_session.commit() def delete_message(msg): matrix_id = matrix_id_from_pnut(msg.user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) events = Events.query.filter(and_(Events.pnut_msg_id == msg.id, Events.deleted == False)).all() for event in events: matrix_api.redact_event(event.room_id, event.event_id) event.deleted = True db_session.commit() def matrix_id_from_pnut(username): return "@" + config['MATRIX_PNUT_PREFIX'] + username + ":" + config['MATRIX_DOMAIN'] def matrix_display_from_pnut(user): if 'name' in user: display = user.name + " <@" + user.username + "> (pnut)" else: display = "@" + user.username return display # return user.username + " (pnut)" def get_matrix_profile(matrix_id): url = matrix_url + '/profile/' + matrix_id r = requests.get(url) if r.status_code == 200: return r.json() else: return None def set_matrix_display(user): matrix_id = matrix_id_from_pnut(user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) matrix_api.set_display_name(matrix_id, matrix_display_from_pnut(user)) def set_matrix_avatar(user): matrix_id = matrix_id_from_pnut(user.username) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) dl = requests.get(user.content.avatar_image.link, stream=True) dl.raise_for_status() with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m: mtype = m.id_buffer(dl.content) ul = matrix_api.media_upload(dl.content, mtype) try: matrix_api.set_avatar_url(matrix_id, ul['content_uri']) avatar = Avatars.query.filter(Avatars.pnut_user == user.username).one_or_none() if avatar is None: avatar = Avatars(pnut_user=user.username, avatar=user.content.avatar_image.link) db_session.add(avatar) else: avatar.avatar = user.content.avatar_image.link db_session.commit() except MatrixRequestError: logger.exception('failed to set user avatar') def new_matrix_user(username): matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN']) data = { 'type': 'm.login.application_service', 'user': config['MATRIX_PNUT_PREFIX'] + username } matrix_api.register(content=data) def join_room(room_id, matrix_id): matrix_api_as = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN']) matrix_api = MatrixHttpApi(config['MATRIX_HOST'], token=config['MATRIX_AS_TOKEN'], identity=matrix_id) try: matrix_api.join_room(room_id) except MatrixRequestError as e: if e.code == 403: matrix_api_as.invite_user(room_id, matrix_id) matrix_api.join_room(room_id) else: logger.exception('failed to join room') logger.debug('-room_join-') def on_message(ws, message): # logger.debug("on_message: " + message) msg = json.loads(message) logger.debug(msg['meta']) if 'data' in msg: if msg['meta']['type'] == "message": # TODO: bypassed other channel types for now if msg['meta']['channel_type'] != 'io.pnut.core.chat': return pmsg = pnutpy.models.Message.from_response_data(msg['data']) if 'is_deleted' in msg['meta']: if msg['meta']['is_deleted']: logger.debug("message: delete") delete_message(pmsg) else: logger.debug("uh whut?") else: new_message(pmsg) def on_error(ws, error): logger.error("on_error: !!! ERROR !!!") logger.error(error) def on_close(ws): logger.debug("on_close: ### CLOSED ###") if not _shutdown.set(): time.sleep(10) t.start() else: time.sleep(2) def on_open(ws): def run(*args): while not _shutdown.isSet(): time.sleep(3) ws.send(".") time.sleep(1) ws.close() logger.debug("*** terminate ***") t = threading.Thread(target=run) t.start() if __name__ == '__main__': # websocket.enableTrace(True) logging.basicConfig(level=logging.INFO) with open("config.yaml", "rb") as config_file: config = yaml.load(config_file) ws_url = 'wss://stream.pnut.io/v0/app?access_token=' ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY'] ws_url += '&include_raw=1' matrix_url = config['MATRIX_HOST'] + '/_matrix/client/r0' # setup the database connection init_db() # setup the websocket connection ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open t = threading.Thread(target=ws.run_forever) t.daemon = True t.start() # setup the matrix app service if config['MATRIX_ADMIN_ROOM']: logger.debug("- sould join admin room -") join_room(config['MATRIX_ADMIN_ROOM'], config['MATRIX_AS_ID']) app.config.update(config) app.run(port=config['LISTEN_PORT']) _shutdown.set() logger.info('!! shutdown initiated !!') time.sleep(2)