pnut-matrix/pnut-matrix.py

427 lines
14 KiB
Python
Raw Permalink Normal View History

2019-01-04 03:49:38 +00:00
import websocket
import threading
import time
import logging
import logging.config
2019-01-04 03:49:38 +00:00
import yaml
import json
import pnutpy
import requests
import magic
import argparse
import os
import re
2019-01-04 03:49:38 +00:00
from matrix_client.api import MatrixHttpApi
from matrix_client.api import MatrixError, MatrixRequestError
from models import Avatars, Rooms, Events, DirectRooms, Users
2019-01-04 03:49:38 +00:00
from database import db_session, init_db
from sqlalchemy import and_
from appservice import app
logger = logging.getLogger()
_shutdown = threading.Event()
_reconnect = threading.Event()
2019-01-04 03:49:38 +00:00
class MLogFilter(logging.Filter):
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(\s.*)$")
ACCESS_TOKEN_RE2 = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
def filter(self, record):
if record.name == "werkzeug" and len(record.args) > 0:
redacted_uri = MLogFilter.ACCESS_TOKEN_RE.sub(r"\1<redacted>\3", record.args[0])
record.args = (redacted_uri, ) + record.args[1:]
elif record.name == "urllib3.connectionpool" and len(record.args) > 3:
redacted_uri = MLogFilter.ACCESS_TOKEN_RE2.sub(r"\1<redacted>\3", record.args[4])
record.args = record.args[:4] + (redacted_uri,) + record.args[5:]
return True
def new_message(msg, meta):
2019-01-04 03:49:38 +00:00
logger.debug("channel: " + msg.channel_id)
logger.debug("username: " + msg.user.username)
if 'name' in msg.user:
logger.debug("name: " + msg.user.name)
logger.debug("text: " + msg.content.text)
# ignore messages posted by the bridge
if msg.user.username == config['MATRIX_PNUT_USER']:
return
if msg.source.id == config['PNUTCLIENT_ID']:
return
if meta['channel_type'] == 'io.pnut.core.chat':
room = Rooms.query.filter(Rooms.pnut_chan == msg.channel_id).one_or_none()
elif meta['channel_type'] == 'io.pnut.core.pm':
room = DirectRooms.query.filter(DirectRooms.pnut_chan == msg.channel_id).one_or_none()
if room is None:
# Do do an invite from the bridge user?
logger.debug('new invite?')
# create room and included matrix recpient
# subscribed_user_ids from meta
logger.debug(meta['subscribed_user_ids'])
pnut_user = matrix_id_from_pnut(msg.user.username)
profile = get_matrix_profile(pnut_user)
if not profile:
new_matrix_user(msg.user.username)
invitees=[]
for pm_user in meta['subscribed_user_ids']:
user = Users.query.filter(Users.pnut_user_id == pm_user).one_or_none()
if int(pm_user) == msg.user.id:
continue
if user is not None:
invitees.append(user.matrix_id)
if len(invitees) > 0:
room = new_room(pnut_user, invitees, msg.channel_id)
else:
room = None
2019-01-04 03:49:38 +00:00
logger.debug(room)
if room is None:
logger.debug('-not_mapped-')
return
matrix_id = matrix_id_from_pnut(msg.user.username)
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
profile = get_matrix_profile(matrix_id)
if not profile:
new_matrix_user(msg.user.username)
logger.debug('-new_user-')
profile = {'displayname': None}
2023-02-16 02:08:51 +00:00
2019-01-04 03:49:38 +00:00
if profile['displayname'] != matrix_display_from_pnut(msg.user):
set_matrix_display(msg.user)
logger.debug('-set_display-')
2023-02-16 02:08:51 +00:00
2019-01-04 03:49:38 +00:00
avatar = Avatars.query.filter(Avatars.pnut_user == msg.user.username).one_or_none()
2021-01-18 05:45:50 +00:00
if avatar is None or avatar.avatar != msg.user.content.avatar_image.url:
2019-01-04 03:49:38 +00:00
set_matrix_avatar(msg.user)
logger.debug('-set_avatar-')
# members = matrix_api.get_room_members(room.room_id)
# logger.debug(members)
# join_room(room.room_id, config['MATRIX_AS_ID'])
# TODO: sort out room invite and join logic
join_room(room.room_id, matrix_id)
if 'content' in msg:
text = msg.content.text + "\n"
ts = int(time.time()) * 1000
2019-01-04 03:49:38 +00:00
lnktext = ""
for link in msg.content.entities.links:
if 'title' in link:
lnktext += link.title + "\n"
2021-01-18 05:45:50 +00:00
if 'url' in link:
lnktext += link.url + "\n"
2019-01-04 03:49:38 +00:00
if len(lnktext) > 0:
text += "\n" + lnktext
r = matrix_api.send_message(room.room_id, text, timestamp=ts)
event = Events(
2023-02-16 02:08:51 +00:00
event_id=r['event_id'],
room_id=room.room_id,
pnut_msg_id=msg.id,
pnut_user_id=msg.user.id,
2019-01-04 03:49:38 +00:00
pnut_chan_id=msg.channel_id,
deleted=False)
db_session.add(event)
db_session.commit()
2021-01-18 05:47:31 +00:00
if 'raw' in msg:
2019-01-04 03:49:38 +00:00
logger.debug('-handle media uploads-')
new_media(room.room_id, msg)
def new_media(room_id, msg):
matrix_id = matrix_id_from_pnut(msg.user.username)
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
ts = int(time.time()) * 1000
2019-01-04 03:49:38 +00:00
2021-01-18 05:47:31 +00:00
if 'io.pnut.core.oembed' in msg.raw:
2019-01-04 03:49:38 +00:00
2021-01-18 05:47:31 +00:00
for oembed in msg.raw['io.pnut.core.oembed']:
info = {}
if oembed.type == 'photo':
msgtype = 'm.image'
dl_url = oembed.url
info['h'] = oembed.height
info['w'] = oembed.width
elif oembed.type == 'audio':
logger.debug("* recieved audio attachment")
continue
elif oembed.type == 'video':
logger.debug("* recieved video attachment")
continue
elif oembed.type == 'html5video':
logger.debug("* recieved html5 video attachment")
continue
elif oembed.type == 'rich':
logger.debug("* recieved video attachment")
continue
else:
logger.debug("* recieved unknown attachment")
continue
dl = requests.get(dl_url, stream=True)
2019-01-04 03:49:38 +00:00
dl.raise_for_status()
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
2021-01-18 05:47:31 +00:00
info['mimetype'] = m.id_buffer(dl.content)
info['size'] = len(dl.content)
ul = matrix_api.media_upload(dl.content, info['mimetype'])
2019-01-04 03:49:38 +00:00
2021-01-18 05:47:31 +00:00
if 'title' in oembed:
title = oembed.title
2019-01-04 03:49:38 +00:00
else:
2021-01-18 05:47:31 +00:00
title = ""
2019-01-04 03:49:38 +00:00
2021-01-18 05:47:31 +00:00
r = matrix_api.send_content(room_id, ul['content_uri'], title, msgtype, extra_information=info, timestamp=ts)
2019-01-04 03:49:38 +00:00
event = Events(
2023-02-16 02:08:51 +00:00
event_id=r['event_id'],
room_id=room_id,
pnut_msg_id=msg.id,
pnut_user_id=msg.user.id,
2019-01-04 03:49:38 +00:00
pnut_chan_id=msg.channel_id,
deleted=False)
db_session.add(event)
db_session.commit()
def delete_message(msg):
matrix_id = matrix_id_from_pnut(msg.user.username)
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
events = Events.query.filter(and_(Events.pnut_msg_id == msg.id, Events.deleted == False)).all()
for event in events:
matrix_api.redact_event(event.room_id, event.event_id)
event.deleted = True
db_session.commit()
def matrix_id_from_pnut(username):
return "@" + config['MATRIX_PNUT_PREFIX'] + username + ":" + config['MATRIX_DOMAIN']
def matrix_display_from_pnut(user):
if 'name' in user:
2022-07-20 23:29:53 +00:00
display = user.name + " (@" + user.username + ")"
2019-01-04 03:49:38 +00:00
else:
display = "@" + user.username
return display
# return user.username + " (pnut)"
def get_matrix_profile(matrix_id):
url = matrix_url + '/profile/' + matrix_id
r = requests.get(url)
if r.status_code == 200:
return r.json()
else:
return None
def set_matrix_display(user):
matrix_id = matrix_id_from_pnut(user.username)
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
matrix_api.set_display_name(matrix_id, matrix_display_from_pnut(user))
def set_matrix_avatar(user):
matrix_id = matrix_id_from_pnut(user.username)
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
2021-01-18 05:45:50 +00:00
dl = requests.get(user.content.avatar_image.url, stream=True)
2019-01-04 03:49:38 +00:00
dl.raise_for_status()
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
mtype = m.id_buffer(dl.content)
ul = matrix_api.media_upload(dl.content, mtype)
try:
matrix_api.set_avatar_url(matrix_id, ul['content_uri'])
avatar = Avatars.query.filter(Avatars.pnut_user == user.username).one_or_none()
if avatar is None:
2021-01-18 05:45:50 +00:00
avatar = Avatars(pnut_user=user.username, avatar=user.content.avatar_image.url)
db_session.add(avatar)
else:
2021-01-18 05:45:50 +00:00
avatar.avatar = user.content.avatar_image.url
2019-01-04 03:49:38 +00:00
db_session.commit()
except MatrixRequestError:
logger.exception('failed to set user avatar')
def new_matrix_user(username):
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'])
data = {
'type': 'm.login.application_service',
'username': config['MATRIX_PNUT_PREFIX'] + username
2019-01-04 03:49:38 +00:00
}
matrix_api.register(content=data)
def join_room(room_id, matrix_id):
2023-02-16 02:08:51 +00:00
matrix_api_as = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'])
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
2019-01-04 03:49:38 +00:00
token=config['MATRIX_AS_TOKEN'],
identity=matrix_id)
2023-02-16 02:08:51 +00:00
2019-01-04 03:49:38 +00:00
try:
matrix_api.join_room(room_id)
2023-02-16 02:08:51 +00:00
2019-01-04 03:49:38 +00:00
except MatrixRequestError as e:
if e.code == 403:
matrix_api_as.invite_user(room_id, matrix_id)
matrix_api.join_room(room_id)
else:
logger.exception('failed to join room')
logger.debug('-room_join-')
def new_room(pnut_user, invitees, chan):
dr = None
2023-02-16 02:08:51 +00:00
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=pnut_user)
url = matrix_url + '/createRoom'
params = {"access_token": config['MATRIX_AS_TOKEN'], "user_id": pnut_user}
content = {"visibility": "private", "is_direct": True, "invite": invitees}
headers = {"Content-Type": "application/json"}
r = requests.post(url, headers=headers, params=params,
data=json.dumps(content))
response = r.json()
for bridge_user in invitees:
dr = DirectRooms(room_id=response['room_id'],
bridge_user=pnut_user, pnut_chan=chan)
logger.debug(dr)
db_session.add(dr)
db_session.commit()
return dr
2019-01-04 03:49:38 +00:00
def on_message(ws, message):
# logger.debug("on_message: " + message)
msg = json.loads(message)
logger.debug(msg['meta'])
2023-02-16 02:08:51 +00:00
2019-01-04 03:49:38 +00:00
if 'data' in msg:
if 'channel_type' in msg['meta']:
2019-01-04 03:49:38 +00:00
if msg['meta']['channel_type'] not in ['io.pnut.core.chat',
'io.pnut.core.pm']:
2019-01-04 03:49:38 +00:00
return
for d_item in msg['data']:
pmsg = pnutpy.models.Message.from_response_data(d_item)
2019-01-04 03:49:38 +00:00
if 'is_deleted' in msg['meta']:
if msg['meta']['is_deleted']:
logger.debug("message: delete")
delete_message(pmsg)
2019-01-04 03:49:38 +00:00
else:
new_message(pmsg, msg['meta'])
2019-01-04 03:49:38 +00:00
def on_error(ws, error):
logger.error("on_error: !!! ERROR !!!")
2019-01-04 03:49:38 +00:00
logger.error(error)
def on_close(ws):
logger.debug("on_close: ### CLOSED ###")
def on_open(ws):
def run(*args):
while not _shutdown.isSet() and not _reconnect.isSet():
2019-01-04 03:49:38 +00:00
time.sleep(3)
try:
ws.send(".")
except websocket._exceptions.WebSocketConnectionClosedException:
logger.debug('websocket closed exception caught...')
_reconnect.set()
2019-01-04 03:49:38 +00:00
time.sleep(1)
logger.debug("*** terminate thread ***")
2019-01-04 03:49:38 +00:00
t = threading.Thread(target=run)
t.start()
def wsthreader(threadfunc):
def wrapper():
while not _shutdown.isSet():
_reconnect.clear()
logger.debug('threadfunc start...')
running = threadfunc()
logger.debug('threadfunc end...')
if running:
time.sleep(5)
else:
_shutdown.set()
logger.debug('*** thread stopped ***')
return wrapper
2019-01-04 03:49:38 +00:00
if __name__ == '__main__':
a_parser = argparse.ArgumentParser()
a_parser.add_argument(
'-d', action='store_true', dest='debug',
help="debug logging"
)
2021-01-18 05:45:50 +00:00
# TODO: solve the database.py problem and enable this
# a_parser.add_argument(
# '-c', '--config', default="config.yaml",
# help="configuration file"
# )
args = a_parser.parse_args()
configyaml = os.environ.get("CONFIG_FILE")
with open(configyaml, "rb") as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader)
# websocket.enableTrace(True)
logging.config.dictConfig(config['logging'])
redact_filter = MLogFilter()
logging.getLogger("werkzeug").addFilter(redact_filter)
logging.getLogger("urllib3.connectionpool").addFilter(redact_filter)
2021-01-18 05:47:31 +00:00
ws_url = 'wss://stream.pnut.io/v1/app?access_token='
2019-01-04 03:49:38 +00:00
ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY']
ws_url += '&include_raw=1'
matrix_url = config['MATRIX_HOST'] + '/_matrix/client/r0'
# setup the database connection
init_db()
# setup the websocket connection
2023-02-16 02:08:51 +00:00
ws = websocket.WebSocketApp(ws_url, on_message=on_message,
2019-01-04 03:49:38 +00:00
on_error=on_error, on_close=on_close)
ws.on_open = on_open
wst = threading.Thread(target=wsthreader(ws.run_forever))
wst.start()
2019-01-04 03:49:38 +00:00
# setup the matrix app service
if config['MATRIX_ADMIN_ROOM']:
logger.debug("- sould join admin room -")
join_room(config['MATRIX_ADMIN_ROOM'], config['MATRIX_AS_ID'])
app.config.update(config)
app.run(host=config['LISTEN_HOST'], port=config['LISTEN_PORT'])
2019-01-04 03:49:38 +00:00
logger.info('!! shutdown initiated !!')
_shutdown.set()
ws.close()
2019-01-04 03:49:38 +00:00
time.sleep(2)