pnut-matrix/pnut-matrix.py

426 lines
14 KiB
Python

import websocket
import threading
import time
import logging
import logging.config
import yaml
import json
import pnutpy
import requests
import magic
import argparse
import os
import re
from matrix_client.api import MatrixHttpApi
from matrix_client.api import MatrixError, MatrixRequestError
from models import Avatars, Rooms, Events, DirectRooms, Users
from database import db_session, init_db
from sqlalchemy import and_
from appservice import app
logger = logging.getLogger()
_shutdown = threading.Event()
_reconnect = threading.Event()
class MLogFilter(logging.Filter):
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(\s.*)$")
ACCESS_TOKEN_RE2 = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
def filter(self, record):
if record.name == "werkzeug" and len(record.args) > 0:
redacted_uri = MLogFilter.ACCESS_TOKEN_RE.sub(r"\1<redacted>\3", record.args[0])
record.args = (redacted_uri, ) + record.args[1:]
elif record.name == "urllib3.connectionpool" and len(record.args) > 3:
redacted_uri = MLogFilter.ACCESS_TOKEN_RE2.sub(r"\1<redacted>\3", record.args[4])
record.args = record.args[:4] + (redacted_uri,) + record.args[5:]
return True
def new_message(msg, meta):
logger.debug("channel: " + msg.channel_id)
logger.debug("username: " + msg.user.username)
if 'name' in msg.user:
logger.debug("name: " + msg.user.name)
logger.debug("text: " + msg.content.text)
# ignore messages posted by the bridge
if msg.user.username == config['MATRIX_PNUT_USER']:
return
if msg.source.id == config['PNUTCLIENT_ID']:
return
if meta['channel_type'] == 'io.pnut.core.chat':
room = Rooms.query.filter(Rooms.pnut_chan == msg.channel_id).one_or_none()
elif meta['channel_type'] == 'io.pnut.core.pm':
room = DirectRooms.query.filter(DirectRooms.pnut_chan == msg.channel_id).one_or_none()
if room is None:
# Do do an invite from the bridge user?
logger.debug('new invite?')
# create room and included matrix recpient
# subscribed_user_ids from meta
logger.debug(meta['subscribed_user_ids'])
pnut_user = matrix_id_from_pnut(msg.user.username)
profile = get_matrix_profile(pnut_user)
if not profile:
new_matrix_user(msg.user.username)
invitees=[]
for pm_user in meta['subscribed_user_ids']:
user = Users.query.filter(Users.pnut_user_id == pm_user).one_or_none()
if int(pm_user) == msg.user.id:
continue
if user is not None:
invitees.append(user.matrix_id)
if len(invitees) > 0:
room = new_room(pnut_user, invitees, msg.channel_id)
else:
room = None
logger.debug(room)
if room is None:
logger.debug('-not_mapped-')
return
matrix_id = matrix_id_from_pnut(msg.user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
profile = get_matrix_profile(matrix_id)
if not profile:
new_matrix_user(msg.user.username)
logger.debug('-new_user-')
profile = {'displayname': None}
if profile['displayname'] != matrix_display_from_pnut(msg.user):
set_matrix_display(msg.user)
logger.debug('-set_display-')
avatar = Avatars.query.filter(Avatars.pnut_user == msg.user.username).one_or_none()
if avatar is None or avatar.avatar != msg.user.content.avatar_image.url:
set_matrix_avatar(msg.user)
logger.debug('-set_avatar-')
# members = matrix_api.get_room_members(room.room_id)
# logger.debug(members)
# join_room(room.room_id, config['MATRIX_AS_ID'])
# TODO: sort out room invite and join logic
join_room(room.room_id, matrix_id)
if 'content' in msg:
text = msg.content.text + "\n"
ts = int(time.time()) * 1000
lnktext = ""
for link in msg.content.entities.links:
if 'title' in link:
lnktext += link.title + "\n"
if 'url' in link:
lnktext += link.url + "\n"
if len(lnktext) > 0:
text += "\n" + lnktext
r = matrix_api.send_message(room.room_id, text, timestamp=ts)
event = Events(
event_id=r['event_id'],
room_id=room.room_id,
pnut_msg_id=msg.id,
pnut_user_id=msg.user.id,
pnut_chan_id=msg.channel_id,
deleted=False)
db_session.add(event)
db_session.commit()
if 'raw' in msg:
logger.debug('-handle media uploads-')
new_media(room.room_id, msg)
def new_media(room_id, msg):
matrix_id = matrix_id_from_pnut(msg.user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
ts = int(time.time()) * 1000
if 'io.pnut.core.oembed' in msg.raw:
for oembed in msg.raw['io.pnut.core.oembed']:
info = {}
if oembed.type == 'photo':
msgtype = 'm.image'
dl_url = oembed.url
info['h'] = oembed.height
info['w'] = oembed.width
elif oembed.type == 'audio':
logger.debug("* recieved audio attachment")
continue
elif oembed.type == 'video':
logger.debug("* recieved video attachment")
continue
elif oembed.type == 'html5video':
logger.debug("* recieved html5 video attachment")
continue
elif oembed.type == 'rich':
logger.debug("* recieved video attachment")
continue
else:
logger.debug("* recieved unknown attachment")
continue
dl = requests.get(dl_url, stream=True)
dl.raise_for_status()
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
info['mimetype'] = m.id_buffer(dl.content)
info['size'] = len(dl.content)
ul = matrix_api.media_upload(dl.content, info['mimetype'])
if 'title' in oembed:
title = oembed.title
else:
title = ""
r = matrix_api.send_content(room_id, ul['content_uri'], title, msgtype, extra_information=info, timestamp=ts)
event = Events(
event_id=r['event_id'],
room_id=room_id,
pnut_msg_id=msg.id,
pnut_user_id=msg.user.id,
pnut_chan_id=msg.channel_id,
deleted=False)
db_session.add(event)
db_session.commit()
def delete_message(msg):
matrix_id = matrix_id_from_pnut(msg.user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
events = Events.query.filter(and_(Events.pnut_msg_id == msg.id, Events.deleted == False)).all()
for event in events:
matrix_api.redact_event(event.room_id, event.event_id)
event.deleted = True
db_session.commit()
def matrix_id_from_pnut(username):
return "@" + config['MATRIX_PNUT_PREFIX'] + username + ":" + config['MATRIX_DOMAIN']
def matrix_display_from_pnut(user):
if 'name' in user:
display = user.name + " (@" + user.username + ")"
else:
display = "@" + user.username
return display
# return user.username + " (pnut)"
def get_matrix_profile(matrix_id):
url = matrix_url + '/profile/' + matrix_id
r = requests.get(url)
if r.status_code == 200:
return r.json()
else:
return None
def set_matrix_display(user):
matrix_id = matrix_id_from_pnut(user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
matrix_api.set_display_name(matrix_id, matrix_display_from_pnut(user))
def set_matrix_avatar(user):
matrix_id = matrix_id_from_pnut(user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
dl = requests.get(user.content.avatar_image.url, stream=True)
dl.raise_for_status()
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
mtype = m.id_buffer(dl.content)
ul = matrix_api.media_upload(dl.content, mtype)
try:
matrix_api.set_avatar_url(matrix_id, ul['content_uri'])
avatar = Avatars.query.filter(Avatars.pnut_user == user.username).one_or_none()
if avatar is None:
avatar = Avatars(pnut_user=user.username, avatar=user.content.avatar_image.url)
db_session.add(avatar)
else:
avatar.avatar = user.content.avatar_image.url
db_session.commit()
except MatrixRequestError:
logger.exception('failed to set user avatar')
def new_matrix_user(username):
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'])
data = {
'type': 'm.login.application_service',
'username': config['MATRIX_PNUT_PREFIX'] + username
}
matrix_api.register(content=data)
def join_room(room_id, matrix_id):
matrix_api_as = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'])
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
try:
matrix_api.join_room(room_id)
except MatrixRequestError as e:
if e.code == 403:
matrix_api_as.invite_user(room_id, matrix_id)
matrix_api.join_room(room_id)
else:
logger.exception('failed to join room')
logger.debug('-room_join-')
def new_room(pnut_user, invitees, chan):
dr = None
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=pnut_user)
url = matrix_url + '/createRoom'
params = {"access_token": config['MATRIX_AS_TOKEN'], "user_id": pnut_user}
content = {"visibility": "private", "is_direct": True, "invite": invitees}
headers = {"Content-Type": "application/json"}
r = requests.post(url, headers=headers, params=params,
data=json.dumps(content))
response = r.json()
for bridge_user in invitees:
dr = DirectRooms(room_id=response['room_id'],
bridge_user=pnut_user, pnut_chan=chan)
logger.debug(dr)
db_session.add(dr)
db_session.commit()
return dr
def on_message(ws, message):
# logger.debug("on_message: " + message)
msg = json.loads(message)
logger.debug(msg['meta'])
if 'data' in msg:
if 'channel_type' in msg['meta']:
if msg['meta']['channel_type'] not in ['io.pnut.core.chat',
'io.pnut.core.pm']:
return
for d_item in msg['data']:
pmsg = pnutpy.models.Message.from_response_data(d_item)
if 'is_deleted' in msg['meta']:
if msg['meta']['is_deleted']:
logger.debug("message: delete")
delete_message(pmsg)
else:
new_message(pmsg, msg['meta'])
def on_error(ws, error):
logger.error("on_error: !!! ERROR !!!")
logger.error(error)
def on_close(ws):
logger.debug("on_close: ### CLOSED ###")
def on_open(ws):
def run(*args):
while not _shutdown.isSet() and not _reconnect.isSet():
time.sleep(3)
try:
ws.send(".")
except websocket._exceptions.WebSocketConnectionClosedException:
logger.debug('websocket closed exception caught...')
_reconnect.set()
time.sleep(1)
logger.debug("*** terminate thread ***")
t = threading.Thread(target=run)
t.start()
def wsthreader(threadfunc):
def wrapper():
while not _shutdown.isSet():
_reconnect.clear()
logger.debug('threadfunc start...')
running = threadfunc()
logger.debug('threadfunc end...')
if running:
time.sleep(5)
else:
_shutdown.set()
logger.debug('*** thread stopped ***')
return wrapper
if __name__ == '__main__':
a_parser = argparse.ArgumentParser()
a_parser.add_argument(
'-d', action='store_true', dest='debug',
help="debug logging"
)
# TODO: solve the database.py problem and enable this
# a_parser.add_argument(
# '-c', '--config', default="config.yaml",
# help="configuration file"
# )
args = a_parser.parse_args()
configyaml = os.environ.get("CONFIG_FILE")
with open(configyaml, "rb") as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader)
# websocket.enableTrace(True)
logging.config.dictConfig(config['logging'])
redact_filter = MLogFilter()
logging.getLogger("werkzeug").addFilter(redact_filter)
logging.getLogger("urllib3.connectionpool").addFilter(redact_filter)
ws_url = 'wss://stream.pnut.io/v1/app?access_token='
ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY']
ws_url += '&include_raw=1'
matrix_url = config['MATRIX_HOST'] + '/_matrix/client/r0'
# setup the database connection
init_db()
# setup the websocket connection
ws = websocket.WebSocketApp(ws_url, on_message=on_message,
on_error=on_error, on_close=on_close)
ws.on_open = on_open
wst = threading.Thread(target=wsthreader(ws.run_forever))
wst.start()
# setup the matrix app service
if config['MATRIX_ADMIN_ROOM']:
logger.debug("- sould join admin room -")
join_room(config['MATRIX_ADMIN_ROOM'], config['MATRIX_AS_ID'])
app.config.update(config)
app.run(host=config['LISTEN_HOST'], port=config['LISTEN_PORT'])
logger.info('!! shutdown initiated !!')
_shutdown.set()
ws.close()
time.sleep(2)