channel monitor

This commit is contained in:
Morgan McMillian 2017-03-04 18:14:32 -08:00
parent 34e580284d
commit ffc4af4b9b
2 changed files with 181 additions and 8 deletions

View file

@ -12,14 +12,6 @@ db.init_app(app)
txId = 0
@app.route("/testme/<hello>")
def on_testme(hello):
logging.debug(hello)
user = MatrixUser.query.filter_by(matrix_id=hello).first()
logging.debug(user)
logging.debug(app.config['MATRIX_PNUT_PREFIX'])
return jsonify({})
@app.route("/transactions/<transaction>", methods=["PUT"])
def on_receive_events(transaction):
global txId

181
pnut-bridge.py Normal file
View file

@ -0,0 +1,181 @@
import yaml
import logging
import threading
import signal
import time
import datetime
from pnutlib import Pnut
from appservice import app
from models import *
_shutdown = threading.Event()
class ChannelMonitor(threading.Thread):
txId = 0
def __init__(self):
threading.Thread.__init__(self)
self.matrix_api_url = app.config['MATRIX_HOST'] + '/_matrix/client/r0'
self.matrix_api_token = app.config['MATRIX_AS_TOKEN']
def generate_matrix_id(self, username):
m_username = app.config['MATRIX_PNUT_PREFIX'] + username
m_userid = "@" + app.config['MATRIX_PNUT_PREFIX'] + username + ":" + app.config['MATRIX_DOMAIN']
m_display = username + " (pnut)"
return {'username': m_username, 'user_id': m_userid, 'displayname': m_display}
def is_registered(self, user_id):
url = self.matrix_api_url + '/profile/' + user_id
r = requests.get(url)
if r.status_code == 200:
rdata = r.json()
if 'displayname' in rdata:
return rdata['displayname']
else:
return False
else:
return False
def create_matrix_user(self, username):
url = self.matrix_api_url + '/register'
params = {
'access_token': self.matrix_api_token
}
data = {
'type': 'm.login.application_service',
'user': username
}
r = requests.post(url, params=params, data=json.dumps(data))
if r.status_code == 200:
logging.info('REGISTERED USER: ' + username)
logging.debug(r.text)
def set_displayname(self, muser):
url = self.matrix_api_url + '/profile/' + muser['user_id'] + '/displayname'
params = {
'access_token': self.matrix_api_token,
'user_id': muser['user_id']
}
data = {
'displayname': muser['displayname']
}
headers = {'Content-Type': 'application/json'}
r = requests.put(url, params=params, data=json.dumps(data), headers=headers)
def join_room(self, user_id, roomid):
url = self.matrix_api_url + '/join/' + roomid
params = {
'access_token': self.matrix_api_token,
'user_id': user_id
}
r = requests.post(url, params=params)
def send_message(self, roomid, msg):
url = self.matrix_api_url + '/rooms/' + roomid +'/send/m.room.message' + '/' + str(txId)
params = {
'access_token': self.matrix_api_token,
'user_id': msg['user']['user_id'],
'ts': msg['ts']
}
data = {
'msgtype': 'm.text',
'body': msg['text']
}
headers = {'Content-Type': 'application/json'}
r = requests.put(url, params=params, data=json.dumps(data), headers=headers)
if r.status_code == 200:
eventid = r.json()['event_id']
el = MatrixMsgEvents(eventid, roomid, msg['msgid'], msg['puser'], msg['chan'])
db.session.add(el)
db.session.commit()
def poll_channel(self, room):
r = Pnut().get_channel_stream(room.pnut_chan, room.pnut_since)
if r is not None and r.status_code == 200:
rdata = r.json()
pqueue = []
for post in rdata['data']:
if post['user']['username'] == app.config['MATRIX_PNUT_USER']:
continue
if post['source']['id'] == app.config['PNUTCLIENT_ID']:
continue
if 'content' in post:
user = self.generate_matrix_id(post['user']['username'])
text = post['content']['text'] + "\n"
dt = datetime.datetime.strptime(post["created_at"], "%Y-%m-%dT%H:%M:%SZ")
ts = time.mktime(dt.timetuple())
msgid = post['id']
puser = post['user']['username']
# handle entities
if 'entities' in post['content']:
if 'links' in post['content']['entities']:
for lnk in post['content']['entities']['links']:
text += "\n"
if 'title' in lnk:
text += lnk['title'] + "\n"
if 'link' in lnk:
text += lnk['link'] + "\n"
# handle raw data
if 'raw' in post:
for raw in post['raw']:
text += "\n"
if raw['type'] == 'io.pnut.core.oembed':
if 'title' in raw['value']:
text += raw['value']['title'] + "\n"
if 'url' in raw['value']:
text += raw['value']['url'] + "\n"
pqueue.insert(0,
{'user':user,'text':text,'ts':ts,'msgid':msgid,'puser':puser,'chan':room.pnut_chan})
if len(rdata["data"]) > 0:
room.pnut_since = rdata["data"][0]["id"]
db.session.commit()
for item in pqueue:
txId += 1
d = self.is_registered(item['user']['user_id'])
if not d:
self.create_matrix_user(item['user']['username'])
if d != item['user']['displayname']:
self.set_displayname(item['user'])
self.join_room(item['user']['user_id'], room.room_id)
time.sleep(1)
self.send_message(room.room_id, item)
def run(self):
logging.info("-- Starting channel monitor --")
app.app_context().push()
while not _shutdown.isSet():
rooms = MatrixRoom.query.all()
for r in rooms:
self.poll_channel(r)
time.sleep(15)
logging.info("-- Stopping channel monitor --")
if __name__ == '__main__':
with open("config.yaml", "rb") as config_file:
config = yaml.load(config_file)
logging.basicConfig(level=logging.DEBUG)
app.config.update(config)
monitor = ChannelMonitor()
monitor.start()
app.run(port=config['LISTEN_PORT'])
_shutdown.set()
logging.info("-- Shutdown in progress --")