add keep alive task to websocket connection and split global room

This commit is contained in:
Morgan McMillian 2025-01-17 15:33:47 -08:00
parent d53a233dc7
commit 58a9de6eef
2 changed files with 103 additions and 11 deletions

View file

@ -423,7 +423,7 @@ def create_pnut_global_room(ctx):
# }
data = {
'visibility': "public",
'name': "Pnut Global Stream",
'name': "🥜 Pnut Global Stream 🌎",
'room_alias_name': f"{config['matrix']['namespace']}global"
}
logging.debug(data)
@ -436,6 +436,80 @@ def create_pnut_global_room(ctx):
click.echo(r.status_code)
click.echo(r.text)
@cmd.command()
@click.pass_context
def create_pnut_news_room(ctx):
config = ctx.obj['config']
if 'matrix' not in config:
click.echo("Matrix configuration missing!")
exit(1)
if 'as_token' not in config['matrix']:
click.echo("matrix appservice token missing from configuration!")
exit(1)
if 'namespace' not in config['matrix']:
click.echo("matrix namespace missing from configuration!")
exit(1)
endpoint = "/_matrix/client/v3/createRoom"
url = config['matrix']['homeserver'] + endpoint
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + config['matrix']['as_token']
}
data = {
'visibility': "public",
'name': "🥜 Pnut News Stream 📰",
'room_alias_name': f"{config['matrix']['namespace']}news"
}
logging.debug(data)
r = requests.post(url, headers=headers, json=data)
if r.status_code == 200:
click.echo(json.dumps(r.json(), indent=4))
else:
click.echo(r.status_code)
click.echo(r.text)
@cmd.command()
@click.pass_context
def create_pnut_bot_room(ctx):
config = ctx.obj['config']
if 'matrix' not in config:
click.echo("Matrix configuration missing!")
exit(1)
if 'as_token' not in config['matrix']:
click.echo("matrix appservice token missing from configuration!")
exit(1)
if 'namespace' not in config['matrix']:
click.echo("matrix namespace missing from configuration!")
exit(1)
endpoint = "/_matrix/client/v3/createRoom"
url = config['matrix']['homeserver'] + endpoint
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + config['matrix']['as_token']
}
data = {
'visibility': "public",
'name': "🥜 Pnut Bot Stream 🤖",
'room_alias_name': f"{config['matrix']['namespace']}bots"
}
logging.debug(data)
r = requests.post(url, headers=headers, json=data)
if r.status_code == 200:
click.echo(json.dumps(r.json(), indent=4))
else:
click.echo(r.status_code)
click.echo(r.text)
@cmd.command()
@click.argument('room_id')
@click.argument('matrix_id')

View file

@ -11,6 +11,7 @@ import argparse
import os
import re
import asyncio
import itertools
from mautrix.client import ClientAPI
from mautrix.types import TextMessageEventContent, Format, MessageType
@ -147,16 +148,7 @@ async def new_pnut_message(msg, meta):
async def new_pnut_post(post, meta):
if not config['pnut']['global_stream']:
return
if (config['pnut']['global_humans_only'] and
post.user.type in ['feed', 'bot']):
logging.debug('-skipping non human post-')
return
if 'content' in post:
text = ""
if 'repost_of' in post:
text += f"<{post.user.username}> reposted >> "
@ -188,7 +180,18 @@ async def new_pnut_post(post, meta):
await set_matrix_avatar(post.user)
logger.debug('-set_avatar-')
room_id = config['pnut']['global_room']
if post.user.type == 'feed' and config['pnut']['news_stream']:
room_id = config['pnut']['news_room']
elif post.user.type == 'bot' and config['pnut']['bot_stream']:
room_id = config['pnut']['bot_room']
elif config['pnut']['global_stream']:
room_id = config['pnut']['global_room']
else:
return
await matrix_api.join_room(room_id)
postlink = f"https://posts.pnut.io/{post.id}"
plaintext = f"{post.content.text}\n{postlink}"
@ -445,6 +448,17 @@ async def on_message(message):
pnut_post = pnutpy.models.Post.from_response_data(item)
await new_pnut_post(pnut_post, meta)
async def keepalive(websocket, ping_interval=30):
logging.debug('starting keeplive task')
for ping in itertools.count():
await asyncio.sleep(ping_interval)
try:
logging.debug('sending . to pnut')
await websocket.send('.')
except ConnectionClosed:
logging.debug('connection closed')
break
async def asmain():
# if config['MATRIX_ADMIN_ROOM']:
# logger.debug("- sould join admin room -")
@ -457,6 +471,7 @@ async def asmain():
ws_url += config['pnut']['app_token'] + '&key=' + config['pnut']['app_key']
ws_url += '&include_raw=1'
async for websocket in connect(uri=ws_url):
keepalive_task = asyncio.create_task(keepalive(websocket))
try:
async for message in websocket:
await on_message(message)
@ -466,6 +481,9 @@ async def asmain():
except ConnectionClosed:
continue
finally:
keepalive_task.cancel()
def main():
global config
a_parser = argparse.ArgumentParser()