From 5b4b5102a7fb638070d372a741a1ab3b2139dc9d Mon Sep 17 00:00:00 2001 From: Morgan McMillian Date: Sun, 25 Mar 2018 11:34:43 -0700 Subject: [PATCH] bi-directional message delete resolves #2 track channel by stream marker resolves #26 --- appservice.py | 4 +++- pnut-bridge.py | 42 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/appservice.py b/appservice.py index ec92bbe..562bd9f 100644 --- a/appservice.py +++ b/appservice.py @@ -140,7 +140,7 @@ def on_receive_events(transaction): elif event['type'] == 'm.room.redaction': - r_event = MatrixMsgEvents.query.filter_by(event_id=event['redacts'],room_id=event['room_id']).first() + r_event = MatrixMsgEvents.query.filter_by(event_id=event['redacts'],room_id=event['room_id'],deleted=False).first() if r_event: if r_event.pnut_user == app.config['MATRIX_PNUT_USER']: token = app.config['MATRIX_PNUT_TOKEN'] @@ -151,6 +151,8 @@ def on_receive_events(transaction): else: return jsonify({}) + r_event.deleted = True + db.session.commit() pnutpy.api.add_authorization_token(token) r, meta = pnutpy.api.delete_message(r_event.pnut_chan, r_event.pnut_msgid) diff --git a/pnut-bridge.py b/pnut-bridge.py index c313805..65a3c2e 100644 --- a/pnut-bridge.py +++ b/pnut-bridge.py @@ -113,16 +113,50 @@ class ChannelMonitor(threading.Thread): logging.debug('error: ' + str(r.status_code)) logging.debug(r.text) + def delete_msgs(self, room, ids): + for i in ids: + r_event = MatrixMsgEvents.query.filter_by(pnut_msgid=i,deleted=False).first() + logging.debug(r_event) + if r_event: + logging.debug(r_event.pnut_msgid) + logging.debug(r_event.event_id) + logging.debug(r_event.deleted) + self.redact_event(room, r_event) + + def redact_event(self, room_id, event): + url = self.matrix_api_url + "/rooms/" + room_id + "/redact/" + event.event_id + "/" + str(self.txId) + headers = {"Content-Type": "application/json"} + params = {'access_token': self.matrix_api_token} + try: + event.deleted = True + db.session.commit() + r = requests.put(url, params=params, data="{}", headers=headers) + logging.debug(r.text) + except Exception as e: + logging.exception('redact_event') + + def update_marker(self, chan, msgid): + try: + response = pnutpy.api.request_json('POST', '/markers', data=[{'name': 'channel:' + chan, 'id': msgid}]) + logging.debug(response) + except Exception as e: + logging.exception('update_marker') + def poll_channel(self, room): try: - messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id=room.pnut_since, include_raw=1) + # messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id=room.pnut_since, include_raw=1) + messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id='last_read', include_raw=1) + logging.debug(meta) + if 'deleted_ids' in meta: + self.delete_msgs(room.room_id, meta['deleted_ids']) except pnutpy.errors.PnutRateLimitAPIException: logging.warning('*** Rate limit error while trying to fetch messages! Waiting to retry. ***') time.sleep(30) return - except: + except Exception as e: logging.warning('*** An error occured while trying to fetch messages! Waiting to retry. ***') + logging.exception('poll_channel') time.sleep(30) return @@ -162,6 +196,8 @@ class ChannelMonitor(threading.Thread): # update the last message id if len(messages) > 0: room.pnut_since = messages[0].id + if 'max_id' in meta: + self.update_marker(room.pnut_chan, meta.max_id) db.session.commit() # empty the queue to the matrix room @@ -191,7 +227,7 @@ class ChannelMonitor(threading.Thread): rooms = MatrixRoom2.query.all() for r in rooms: self.poll_channel(r) - time.sleep(.5) + time.sleep(5) time.sleep(3) logging.info("-- Stopping channel monitor --")