Code uplift to replace old libraries and support asyncio

matrix-client was replaced with mautrix, issues #56 #59
websocket-client was replaced with websockets
flask updated to support async
early support for global timeline, issue #71
This commit is contained in:
Morgan McMillian 2024-12-15 15:04:17 -08:00
parent 409b2a5a3c
commit 5638805481
4 changed files with 710 additions and 583 deletions

View file

@ -7,19 +7,19 @@ This bridge will pass pnut.io channel messages through to Matrix, and Matrix mes
## Usage ## Usage
The public bridge is once again online! The public bridge is not yet online pending uplift of this code base. Stay tuned!
See [Using-the-public-bridge](https://gitlab.com/thrrgilag/pnut-matrix/-/wikis/Using-the-public-bridge) for details.
## Installation ## Installation
**Warning! This code is extremely unstable and not yet ready for use on your matrix server.**
Currently pnut-matrix has been only tested with and confirmed to work with [synapse]. Please refer to the [synapse installation instructions] for details on how to setup your homeserver. Currently pnut-matrix has been only tested with and confirmed to work with [synapse]. Please refer to the [synapse installation instructions] for details on how to setup your homeserver.
To install the latest version of pnut-matrix from source: To install the latest version of pnut-matrix from source:
```sh ```sh
git clone https://gitlab.com/thrrgilag/pnut-matrix.git git clone https://git.dreamfall.space/spacenerdmo/pnut-matrix.git
cd pnut-matrix cd pnut-matrix
python3 -m venv env python3 -m venv env
source env/bin/activate source env/bin/activate
@ -43,12 +43,11 @@ curl --data '{"type": "m.login.application_service", "username": "your_sender_lo
## Contributing and support ## Contributing and support
You can open issues for bugs or feature requests and you can submit merge requests to this project on [GitLab]. You can also submit issues and patches directly to [morgan@mcmillian.dev]. Please submit bugs, feature requests, and patches to [morgan@mcmillian.dev].
Join my public chat room for development discussion. Join my public chat room on pnut.io for development discussion.
- [pnut-matrix on pnut.io] - [pnut-matrix]
- [#pnut_999:pnut-matrix.dreamfall.space]
## License ## License
@ -58,9 +57,7 @@ GPLv3, see [LICENSE].
[synapse]: https://github.com/matrix-org/synapse [synapse]: https://github.com/matrix-org/synapse
[synapse installation instructions]: https://matrix-org.github.io/synapse/latest/setup/installation.html [synapse installation instructions]: https://matrix-org.github.io/synapse/latest/setup/installation.html
[syanpse configuration]: https://matrix-org.github.io/synapse/latest/application_services.html [syanpse configuration]: https://matrix-org.github.io/synapse/latest/application_services.html
[GitLab]: https://gitlab.com/thrrgilag/pnut-matrix/
[morgan@mcmillian.dev]: mailto:morgan@mcmillian.dev [morgan@mcmillian.dev]: mailto:morgan@mcmillian.dev
[pnut-matrix on pnut.io]: https://patter.chat/999 [pnut-matrix]: https://patter.chat/999
[#pnut_999:pnut-matrix.dreamfall.space]: https://matrix.to/#/#pnut_999:pnut-matrix.dreamfall.space
[LICENSE]: LICENSE [LICENSE]: LICENSE
[^1]: https://github.com/matrix-org/matrix-appservice-irc/issues/1270#issuecomment-849765090 [^1]: https://github.com/matrix-org/matrix-appservice-irc/issues/1270#issuecomment-849765090

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,3 @@
import websocket
import threading
import time import time
import logging import logging
import logging.config import logging.config
@ -11,19 +9,22 @@ import magic
import argparse import argparse
import os import os
import re import re
import asyncio
from mautrix.client import ClientAPI
from mautrix.types import TextMessageEventContent, Format, MessageType
from mautrix.errors import MatrixConnectionError
from mautrix.errors.request import MNotFound, MForbidden
from websockets.asyncio.client import connect
from websockets.exceptions import ConnectionClosed
from matrix_client.api import MatrixHttpApi
from matrix_client.api import MatrixError, MatrixRequestError
from models import Avatars, Rooms, Events, DirectRooms, Users from models import Avatars, Rooms, Events, DirectRooms, Users
from database import db_session, init_db from database import db_session, init_db
from sqlalchemy import and_ from sqlalchemy import and_
from appservice import app
logger = logging.getLogger() logger = logging.getLogger()
_shutdown = threading.Event()
_reconnect = threading.Event()
class MLogFilter(logging.Filter): class MLogFilter(logging.Filter):
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(\s.*)$") ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(\s.*)$")
@ -31,15 +32,17 @@ class MLogFilter(logging.Filter):
def filter(self, record): def filter(self, record):
if record.name == "werkzeug" and len(record.args) > 0: if record.name == "werkzeug" and len(record.args) > 0:
redacted_uri = MLogFilter.ACCESS_TOKEN_RE.sub(r"\1<redacted>\3", record.args[0]) redacted_uri = MLogFilter.ACCESS_TOKEN_RE.sub(r"\1<redacted>\3",
record.args[0])
record.args = (redacted_uri, ) + record.args[1:] record.args = (redacted_uri, ) + record.args[1:]
elif record.name == "urllib3.connectionpool" and len(record.args) > 3: elif record.name == "urllib3.connectionpool" and len(record.args) > 3:
redacted_uri = MLogFilter.ACCESS_TOKEN_RE2.sub(r"\1<redacted>\3", record.args[4]) redacted_uri = MLogFilter.ACCESS_TOKEN_RE2.sub(r"\1<redacted>\3",
record.args[4])
record.args = record.args[:4] + (redacted_uri,) + record.args[5:] record.args = record.args[:4] + (redacted_uri,) + record.args[5:]
return True return True
def new_message(msg, meta): async def new_pnut_message(msg, meta):
logger.debug("channel: " + msg.channel_id) logger.debug("channel: " + msg.channel_id)
logger.debug("username: " + msg.user.username) logger.debug("username: " + msg.user.username)
if 'name' in msg.user: if 'name' in msg.user:
@ -53,10 +56,18 @@ def new_message(msg, meta):
if msg.source.id == config['PNUTCLIENT_ID']: if msg.source.id == config['PNUTCLIENT_ID']:
return return
matrix_id = matrix_id_from_pnut(msg.user.username)
matrix_api = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
as_user_id=matrix_id.lower())
if meta['channel_type'] == 'io.pnut.core.chat': if meta['channel_type'] == 'io.pnut.core.chat':
room = Rooms.query.filter(Rooms.pnut_chan == msg.channel_id).one_or_none() room = Rooms.query.filter(Rooms.pnut_chan ==
msg.channel_id).one_or_none()
elif meta['channel_type'] == 'io.pnut.core.pm': elif meta['channel_type'] == 'io.pnut.core.pm':
room = DirectRooms.query.filter(DirectRooms.pnut_chan == msg.channel_id).one_or_none() room = DirectRooms.query.filter(DirectRooms.pnut_chan ==
msg.channel_id).one_or_none()
if room is None: if room is None:
# Do do an invite from the bridge user? # Do do an invite from the bridge user?
logger.debug('new invite?') logger.debug('new invite?')
@ -64,12 +75,13 @@ def new_message(msg, meta):
# subscribed_user_ids from meta # subscribed_user_ids from meta
logger.debug(meta['subscribed_user_ids']) logger.debug(meta['subscribed_user_ids'])
pnut_user = matrix_id_from_pnut(msg.user.username) pnut_user = matrix_id_from_pnut(msg.user.username)
profile = get_matrix_profile(pnut_user) profile = await matrix_api.get_profile(matrix_id.lower())
if not profile: if not profile:
new_matrix_user(msg.user.username) new_matrix_user(msg.user.username)
invitees=[] invitees=[]
for pm_user in meta['subscribed_user_ids']: for pm_user in meta['subscribed_user_ids']:
user = Users.query.filter(Users.pnut_user_id == pm_user).one_or_none() user = Users.query.filter(Users.pnut_user_id ==
pm_user).one_or_none()
if int(pm_user) == msg.user.id: if int(pm_user) == msg.user.id:
continue continue
if user is not None: if user is not None:
@ -84,49 +96,39 @@ def new_message(msg, meta):
logger.debug('-not_mapped-') logger.debug('-not_mapped-')
return return
matrix_id = matrix_id_from_pnut(msg.user.username) try:
matrix_api = MatrixHttpApi(config['MATRIX_HOST'], profile = await matrix_api.get_profile(matrix_id.lower())
token=config['MATRIX_AS_TOKEN'], logger.debug(profile)
identity=matrix_id)
profile = get_matrix_profile(matrix_id) except MNotFound:
if not profile:
new_matrix_user(msg.user.username) new_matrix_user(msg.user.username)
logger.debug('-new_user-') logger.debug('-new_user-')
profile = {'displayname': None} profile = {'displayname': None}
if profile['displayname'] != matrix_display_from_pnut(msg.user): if profile['displayname'] != matrix_display_from_pnut(msg.user):
set_matrix_display(msg.user) await set_matrix_display(msg.user)
logger.debug('-set_display-') logger.debug('-set_display-')
avatar = Avatars.query.filter(Avatars.pnut_user == msg.user.username).one_or_none() avatar = Avatars.query.filter(Avatars.pnut_user ==
msg.user.username).one_or_none()
if avatar is None or avatar.avatar != msg.user.content.avatar_image.url: if avatar is None or avatar.avatar != msg.user.content.avatar_image.url:
set_matrix_avatar(msg.user) await set_matrix_avatar(msg.user)
logger.debug('-set_avatar-') logger.debug('-set_avatar-')
# members = matrix_api.get_room_members(room.room_id) # members = matrix_api.get_room_members(room.room_id)
# logger.debug(members) # logger.debug(members)
# join_room(room.room_id, config['MATRIX_AS_ID']) # join_room(room.room_id, config['MATRIX_AS_ID'])
# TODO: sort out room invite and join logic # TODO: sort out room invite and join logic
join_room(room.room_id, matrix_id) await join_room(room.room_id, matrix_id)
if 'content' in msg: if 'content' in msg:
text = msg.content.text + "\n" eventtext = TextMessageEventContent(msgtype=MessageType.TEXT,
ts = int(time.time()) * 1000 format=Format.HTML,
body=msg.content.text,
lnktext = "" formatted_body=msg.content.html)
for link in msg.content.entities.links: rid = await matrix_api.send_message(room.room_id, eventtext)
if 'title' in link:
lnktext += link.title + "\n"
if 'url' in link:
lnktext += link.url + "\n"
if len(lnktext) > 0:
text += "\n" + lnktext
r = matrix_api.send_message(room.room_id, text, timestamp=ts)
event = Events( event = Events(
event_id=r['event_id'], event_id=rid,
room_id=room.room_id, room_id=room.room_id,
pnut_msg_id=msg.id, pnut_msg_id=msg.id,
pnut_user_id=msg.user.id, pnut_user_id=msg.user.id,
@ -137,14 +139,80 @@ def new_message(msg, meta):
if 'raw' in msg: if 'raw' in msg:
logger.debug('-handle media uploads-') logger.debug('-handle media uploads-')
new_media(room.room_id, msg) await new_media(room.room_id, msg)
def new_media(room_id, msg): async def new_pnut_post(post, meta):
matrix_id = matrix_id_from_pnut(msg.user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'], if not config['PNUT_GLOBAL']:
return
if (config['PNUT_GLOBAL_HUMAN_ONLY'] and
post.user.type in ['feed', 'bot']):
logging.debug('-skipping non human post-')
return
if 'content' in post:
text = ""
if 'repost_of' in post:
text += f"<{post.user.username}> reposted >> "
post = post.repost_of
matrix_id = matrix_id_from_pnut(post.user.username)
matrix_api = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'], token=config['MATRIX_AS_TOKEN'],
identity=matrix_id) as_user_id=matrix_id.lower())
ts = int(time.time()) * 1000 try:
profile = await matrix_api.get_profile(matrix_id.lower())
except MNotFound:
new_matrix_user(post.user.username)
profile = {'displayname': None}
if profile['displayname'] != matrix_display_from_pnut(post.user):
await set_matrix_display(post.user)
logger.debug('-set_display-')
avatar = Avatars.query.filter(Avatars.pnut_user ==
post.user.username).one_or_none()
if (avatar is None or
avatar.avatar != post.user.content.avatar_image.url):
await set_matrix_avatar(post.user)
logger.debug('-set_avatar-')
room_id = config['MATRIX_GLOBAL_ROOM']
await join_room(room_id, matrix_id)
postlink = f"https://posts.pnut.io/{post.id}"
plaintext = f"{post.content.text}\n{postlink}"
htmltext = (f"{post.content.html}"
f" &nbsp;<a href='{postlink}'>[🔗]</a>")
eventtext = TextMessageEventContent(msgtype=MessageType.TEXT,
format=Format.HTML,
body=plaintext,
formatted_body=htmltext)
rid = await matrix_api.send_message(room_id, eventtext)
event = Events(
event_id=rid,
room_id=room_id,
pnut_msg_id=post.id,
pnut_user_id=post.user.id,
pnut_chan_id=0,
deleted=False)
db_session.add(event)
db_session.commit()
if 'raw' in post:
logger.debug('-handle media uploads-')
await new_media(room_id, post)
async def new_media(room_id, msg):
matrix_id = matrix_id_from_pnut(msg.user.username)
matrix_api = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
as_user_id=matrix_id.lower())
if 'io.pnut.core.oembed' in msg.raw: if 'io.pnut.core.oembed' in msg.raw:
@ -157,16 +225,20 @@ def new_media(room_id, msg):
info['h'] = oembed.height info['h'] = oembed.height
info['w'] = oembed.width info['w'] = oembed.width
elif oembed.type == 'audio': elif oembed.type == 'audio':
logger.debug("* recieved audio attachment") msgtype = 'm.audio'
continue dl_url = oembed.url
elif oembed.type == 'video': elif oembed.type == 'video':
logger.debug("* recieved video attachment") msgtype = 'm.video'
continue dl_url = oembed.url
info['h'] = oembed.height
info['w'] = oembed.width
elif oembed.type == 'html5video': elif oembed.type == 'html5video':
logger.debug("* recieved html5 video attachment") msgtype = 'm.video'
continue dl_url = oembed.url
info['h'] = oembed.height
info['w'] = oembed.width
elif oembed.type == 'rich': elif oembed.type == 'rich':
logger.debug("* recieved video attachment") logger.debug("* recieved rich attachment")
continue continue
else: else:
logger.debug("* recieved unknown attachment") logger.debug("* recieved unknown attachment")
@ -177,128 +249,178 @@ def new_media(room_id, msg):
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m: with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
info['mimetype'] = m.id_buffer(dl.content) info['mimetype'] = m.id_buffer(dl.content)
info['size'] = len(dl.content) info['size'] = len(dl.content)
ul = matrix_api.media_upload(dl.content, info['mimetype']) ul = await matrix_api.upload_media(dl.content,
mime_type=info['mimetype'])
if 'title' in oembed: if 'title' in oembed:
title = oembed.title title = oembed.title
else: else:
title = "" title = ""
r = matrix_api.send_content(room_id, ul['content_uri'], title, msgtype, extra_information=info, timestamp=ts) rid = await matrix_api.send_file(room_id, ul,
file_name=title,
file_type=msgtype,
info=info)
if 'channel_id' in msg:
channel_id = msg.channel_id
else:
channel_id = 0
event = Events( event = Events(
event_id=r['event_id'], event_id=rid,
room_id=room_id, room_id=room_id,
pnut_msg_id=msg.id, pnut_msg_id=msg.id,
pnut_user_id=msg.user.id, pnut_user_id=msg.user.id,
pnut_chan_id=msg.channel_id, pnut_chan_id=channel_id,
deleted=False) deleted=False)
db_session.add(event) db_session.add(event)
db_session.commit() db_session.commit()
def delete_message(msg): async def delete_message(msg):
matrix_id = matrix_id_from_pnut(msg.user.username) matrix_id = matrix_id_from_pnut(msg.user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'], matrix_api = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'], token=config['MATRIX_AS_TOKEN'],
identity=matrix_id) as_user_id=matrix_id.lower())
events = Events.query.filter(and_(Events.pnut_msg_id == msg.id, Events.deleted == False)).all() events = Events.query.filter(and_(Events.pnut_msg_id ==
msg.id, Events.deleted == False)).all()
for event in events: for event in events:
matrix_api.redact_event(event.room_id, event.event_id) await matrix_api.redact(event.room_id, event.event_id)
event.deleted = True event.deleted = True
db_session.commit() db_session.commit()
def matrix_id_from_pnut(username): def matrix_id_from_pnut(username):
return "@" + config['MATRIX_PNUT_PREFIX'] + username + ":" + config['MATRIX_DOMAIN'] matrix_id = (f"@{config['MATRIX_PNUT_PREFIX']}{username}"
f":{config['MATRIX_DOMAIN']}")
return matrix_id
def matrix_display_from_pnut(user): def matrix_display_from_pnut(user):
if user.type == 'bot':
icon = ' 🤖'
elif user.type == 'feed':
icon = ' 📰'
else:
icon = ''
if 'name' in user: if 'name' in user:
display = user.name + " (@" + user.username + ")" display = user.name + " (@" + user.username + ")" + icon
else: else:
display = "@" + user.username display = "@" + user.username + icon
return display return display
# return user.username + " (pnut)"
def get_matrix_profile(matrix_id): async def set_matrix_display(user):
url = matrix_url + '/profile/' + matrix_id
r = requests.get(url)
if r.status_code == 200:
return r.json()
else:
return None
def set_matrix_display(user):
matrix_id = matrix_id_from_pnut(user.username) matrix_id = matrix_id_from_pnut(user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'], matrix_api = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'], token=config['MATRIX_AS_TOKEN'],
identity=matrix_id) as_user_id=matrix_id.lower())
matrix_api.set_display_name(matrix_id, matrix_display_from_pnut(user)) await matrix_api.set_displayname(matrix_display_from_pnut(user))
def set_matrix_avatar(user): async def set_matrix_avatar(user):
matrix_id = matrix_id_from_pnut(user.username) matrix_id = matrix_id_from_pnut(user.username)
matrix_api = MatrixHttpApi(config['MATRIX_HOST'], matrix_api = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'], token=config['MATRIX_AS_TOKEN'],
identity=matrix_id) as_user_id=matrix_id.lower())
dl = requests.get(user.content.avatar_image.url, stream=True) dl = requests.get(user.content.avatar_image.url, stream=True)
dl.raise_for_status() dl.raise_for_status()
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m: with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
mtype = m.id_buffer(dl.content) mtype = m.id_buffer(dl.content)
ul = matrix_api.media_upload(dl.content, mtype) ul = await matrix_api.upload_media(dl.content, mtype)
logger.debug(ul)
try: try:
matrix_api.set_avatar_url(matrix_id, ul['content_uri']) await matrix_api.set_avatar_url(ul)
avatar = Avatars.query.filter(Avatars.pnut_user == user.username).one_or_none() avatar = Avatars.query.filter(Avatars.pnut_user ==
user.username).one_or_none()
if avatar is None: if avatar is None:
avatar = Avatars(pnut_user=user.username, avatar=user.content.avatar_image.url) avatar = Avatars(pnut_user=user.username,
avatar=user.content.avatar_image.url)
db_session.add(avatar) db_session.add(avatar)
else: else:
avatar.avatar = user.content.avatar_image.url avatar.avatar = user.content.avatar_image.url
db_session.commit() db_session.commit()
except MatrixRequestError: except Exception:
logger.exception('failed to set user avatar') logger.exception('failed to set user avatar')
def new_matrix_user(username): def new_matrix_user(username):
matrix_api = MatrixHttpApi(config['MATRIX_HOST'], endpoint = "/_matrix/client/v3/register"
token=config['MATRIX_AS_TOKEN']) url = config['MATRIX_HOST'] + endpoint
params = {'kind': 'user'}
data = { data = {
'type': 'm.login.application_service', 'type': 'm.login.application_service',
'username': config['MATRIX_PNUT_PREFIX'] + username 'username': config['MATRIX_PNUT_PREFIX'] + username
} }
matrix_api.register(content=data) headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + config['MATRIX_AS_TOKEN']
}
logger.debug(data)
r = requests.post(url, headers=headers, json=data, params=params)
if r.status_code == 200:
return
def join_room(room_id, matrix_id): else:
matrix_api_as = MatrixHttpApi(config['MATRIX_HOST'], errmsg = f"- unable to register {username} -"
logger.warning(errmsg)
logger.debug(r.status_code)
logger.debug(r.text)
return
async def join_room(room_id, matrix_id):
logging.debug('----- trying to join room -----')
matrix_api_as = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN']) token=config['MATRIX_AS_TOKEN'])
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
matrix_api = ClientAPI(config['MATRIX_AS_ID'],
base_url=config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'], token=config['MATRIX_AS_TOKEN'],
identity=matrix_id) as_user_id=matrix_id.lower())
try: try:
matrix_api.join_room(room_id) await matrix_api.join_room(room_id)
# logging.debug('----- should be joined -----')
except MatrixRequestError as e: except MForbidden:
if e.code == 403: # logging.debug('------ got a forbidden ------')
matrix_api_as.invite_user(room_id, matrix_id) await matrix_api_as.invite_user(room_id, matrix_id.lower())
matrix_api.join_room(room_id) await matrix_api.join_room(room_id)
else:
except MatrixConnectionError:
# logger.debug(e)
# if 'code' in e and e.code == 403:
# await matrix_api_as.invite_user(room_id, matrix_id)
# await matrix_api.join_room(room_id)
# else:
logging.debug('------- moar join errors -------')
logger.exception('failed to join room') logger.exception('failed to join room')
logger.debug(f"{room_id}")
logger.debug('-room_join-') logger.debug('-room_join-')
def new_room(pnut_user, invitees, chan): def new_room(pnut_user, invitees, chan):
dr = None dr = None
matrix_api = MatrixHttpApi(config['MATRIX_HOST'],
token=config['MATRIX_AS_TOKEN'],
identity=pnut_user)
url = matrix_url + '/createRoom' url = matrix_url + '/createRoom'
params = {"access_token": config['MATRIX_AS_TOKEN'], "user_id": pnut_user} params = {
content = {"visibility": "private", "is_direct": True, "invite": invitees} "access_token": config['MATRIX_AS_TOKEN'],
"user_id": pnut_user.lower()
}
content = {
"visibility": "private",
"is_direct": True,
"invite": invitees
}
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
r = requests.post(url, headers=headers, params=params, r = requests.post(url, headers=headers, params=params,
data=json.dumps(content)) data=json.dumps(content))
response = r.json() response = r.json()
logger.debug(r.status_code)
logger.debug(r.text)
logger.debug(response)
for bridge_user in invitees: for bridge_user in invitees:
dr = DirectRooms(room_id=response['room_id'], dr = DirectRooms(room_id=response['room_id'],
bridge_user=pnut_user, pnut_chan=chan) bridge_user=pnut_user, pnut_chan=chan)
@ -308,83 +430,71 @@ def new_room(pnut_user, invitees, chan):
return dr return dr
def on_message(ws, message): async def on_message(message):
# logger.debug("on_message: " + message) logger.debug("on_message: " + message)
msg = json.loads(message) msg = json.loads(message)
logger.debug(msg['meta'])
if 'data' in msg: if 'meta' in msg:
meta = msg['meta']
if 'channel_type' in msg['meta']: else:
if msg['meta']['channel_type'] not in ['io.pnut.core.chat',
'io.pnut.core.pm']:
return return
for d_item in msg['data']: if 'data' in msg:
pmsg = pnutpy.models.Message.from_response_data(d_item) data = msg['data']
if 'is_deleted' in msg['meta']:
if msg['meta']['is_deleted']:
logger.debug("message: delete")
delete_message(pmsg)
else: else:
new_message(pmsg, msg['meta']) return
def on_error(ws, error): if 'type' in meta:
logger.error("on_error: !!! ERROR !!!")
logger.error(error)
def on_close(ws): if meta['type'] == "message":
logger.debug("on_close: ### CLOSED ###")
def on_open(ws): channel_types = ['io.pnut.core.chat', 'io.pnut.core.pm']
if meta['channel_type'] not in channel_types:
return
def run(*args): for item in data:
while not _shutdown.isSet() and not _reconnect.isSet(): pnut_msg = pnutpy.models.Message.from_response_data(item)
time.sleep(3)
if 'is_deleted' in meta and meta['is_deleted']:
logger.debug("-message: delete-")
delete_message(pnut_msg)
else:
await new_pnut_message(pnut_msg, meta)
elif meta['type'] == "post":
for item in data:
pnut_post = pnutpy.models.Post.from_response_data(item)
await new_pnut_post(pnut_post, meta)
async def asmain():
if config['MATRIX_ADMIN_ROOM']:
logger.debug("- sould join admin room -")
await join_room(config['MATRIX_ADMIN_ROOM'], config['MATRIX_AS_ID'])
ws_url = 'wss://stream.pnut.io/v1/app?access_token='
ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY']
ws_url += '&include_raw=1'
async for websocket in connect(uri=ws_url):
try: try:
ws.send(".") async for message in websocket:
except websocket._exceptions.WebSocketConnectionClosedException: await on_message(message)
logger.debug('websocket closed exception caught...')
_reconnect.set()
time.sleep(1) await websocket.close()
logger.debug("*** terminate thread ***")
t = threading.Thread(target=run) except ConnectionClosed:
t.start() continue
def wsthreader(threadfunc):
def wrapper():
while not _shutdown.isSet():
_reconnect.clear()
logger.debug('threadfunc start...')
running = threadfunc()
logger.debug('threadfunc end...')
if running:
time.sleep(5)
else:
_shutdown.set()
logger.debug('*** thread stopped ***')
return wrapper
if __name__ == '__main__': if __name__ == '__main__':
a_parser = argparse.ArgumentParser() a_parser = argparse.ArgumentParser()
a_parser.add_argument( a_parser.add_argument(
'-d', action='store_true', dest='debug', '-d', action='store_true', dest='debug',
help="debug logging" help="debug logging"
) )
# TODO: solve the database.py problem and enable this a_parser.add_argument('-c', '--config', dest='configyaml',
# a_parser.add_argument( default="config.yaml", help="configuration file")
# '-c', '--config', default="config.yaml",
# help="configuration file"
# )
args = a_parser.parse_args() args = a_parser.parse_args()
configyaml = os.environ.get("CONFIG_FILE") configyaml = os.environ.get("CONFIG_FILE")
@ -392,35 +502,16 @@ if __name__ == '__main__':
with open(configyaml, "rb") as config_file: with open(configyaml, "rb") as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader) config = yaml.load(config_file, Loader=yaml.SafeLoader)
# websocket.enableTrace(True)
logging.config.dictConfig(config['logging']) logging.config.dictConfig(config['logging'])
redact_filter = MLogFilter() redact_filter = MLogFilter()
logging.getLogger("werkzeug").addFilter(redact_filter) logging.getLogger("werkzeug").addFilter(redact_filter)
logging.getLogger("urllib3.connectionpool").addFilter(redact_filter) logging.getLogger("urllib3.connectionpool").addFilter(redact_filter)
ws_url = 'wss://stream.pnut.io/v1/app?access_token=' matrix_url = config['MATRIX_HOST'] + '/_matrix/client/v3'
ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY']
ws_url += '&include_raw=1'
matrix_url = config['MATRIX_HOST'] + '/_matrix/client/r0'
# setup the database connection # setup the database connection
init_db() init_db()
# setup the websocket connection asyncio.run(asmain())
ws = websocket.WebSocketApp(ws_url, on_message=on_message,
on_error=on_error, on_close=on_close)
ws.on_open = on_open
wst = threading.Thread(target=wsthreader(ws.run_forever))
wst.start()
# setup the matrix app service
if config['MATRIX_ADMIN_ROOM']:
logger.debug("- sould join admin room -")
join_room(config['MATRIX_ADMIN_ROOM'], config['MATRIX_AS_ID'])
app.config.update(config)
app.run(host=config['LISTEN_HOST'], port=config['LISTEN_PORT'])
logger.info('!! shutdown initiated !!') logger.info('!! shutdown initiated !!')
_shutdown.set()
ws.close()
time.sleep(2)

View file

@ -1,8 +1,9 @@
pyyaml pyyaml
requests requests
matrix-client==0.3.2 Flask[async]
Flask pnutpy>=0.5.0
pnutpy
sqlalchemy sqlalchemy
websocket-client
filemagic filemagic
mautrix>=0.20.6,<0.21
websockets
asyncclick