bi-directional message delete resolves #2
track channel by stream marker resolves #26
This commit is contained in:
parent
2a1e384034
commit
5b4b5102a7
2 changed files with 42 additions and 4 deletions
|
@ -140,7 +140,7 @@ def on_receive_events(transaction):
|
|||
|
||||
|
||||
elif event['type'] == 'm.room.redaction':
|
||||
r_event = MatrixMsgEvents.query.filter_by(event_id=event['redacts'],room_id=event['room_id']).first()
|
||||
r_event = MatrixMsgEvents.query.filter_by(event_id=event['redacts'],room_id=event['room_id'],deleted=False).first()
|
||||
if r_event:
|
||||
if r_event.pnut_user == app.config['MATRIX_PNUT_USER']:
|
||||
token = app.config['MATRIX_PNUT_TOKEN']
|
||||
|
@ -151,6 +151,8 @@ def on_receive_events(transaction):
|
|||
else:
|
||||
return jsonify({})
|
||||
|
||||
r_event.deleted = True
|
||||
db.session.commit()
|
||||
pnutpy.api.add_authorization_token(token)
|
||||
r, meta = pnutpy.api.delete_message(r_event.pnut_chan, r_event.pnut_msgid)
|
||||
|
||||
|
|
|
@ -113,16 +113,50 @@ class ChannelMonitor(threading.Thread):
|
|||
logging.debug('error: ' + str(r.status_code))
|
||||
logging.debug(r.text)
|
||||
|
||||
def delete_msgs(self, room, ids):
|
||||
for i in ids:
|
||||
r_event = MatrixMsgEvents.query.filter_by(pnut_msgid=i,deleted=False).first()
|
||||
logging.debug(r_event)
|
||||
if r_event:
|
||||
logging.debug(r_event.pnut_msgid)
|
||||
logging.debug(r_event.event_id)
|
||||
logging.debug(r_event.deleted)
|
||||
self.redact_event(room, r_event)
|
||||
|
||||
def redact_event(self, room_id, event):
|
||||
url = self.matrix_api_url + "/rooms/" + room_id + "/redact/" + event.event_id + "/" + str(self.txId)
|
||||
headers = {"Content-Type": "application/json"}
|
||||
params = {'access_token': self.matrix_api_token}
|
||||
try:
|
||||
event.deleted = True
|
||||
db.session.commit()
|
||||
r = requests.put(url, params=params, data="{}", headers=headers)
|
||||
logging.debug(r.text)
|
||||
except Exception as e:
|
||||
logging.exception('redact_event')
|
||||
|
||||
def update_marker(self, chan, msgid):
|
||||
try:
|
||||
response = pnutpy.api.request_json('POST', '/markers', data=[{'name': 'channel:' + chan, 'id': msgid}])
|
||||
logging.debug(response)
|
||||
except Exception as e:
|
||||
logging.exception('update_marker')
|
||||
|
||||
def poll_channel(self, room):
|
||||
|
||||
try:
|
||||
messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id=room.pnut_since, include_raw=1)
|
||||
# messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id=room.pnut_since, include_raw=1)
|
||||
messages, meta = pnutpy.api.get_channel_messages(room.pnut_chan, since_id='last_read', include_raw=1)
|
||||
logging.debug(meta)
|
||||
if 'deleted_ids' in meta:
|
||||
self.delete_msgs(room.room_id, meta['deleted_ids'])
|
||||
except pnutpy.errors.PnutRateLimitAPIException:
|
||||
logging.warning('*** Rate limit error while trying to fetch messages! Waiting to retry. ***')
|
||||
time.sleep(30)
|
||||
return
|
||||
except:
|
||||
except Exception as e:
|
||||
logging.warning('*** An error occured while trying to fetch messages! Waiting to retry. ***')
|
||||
logging.exception('poll_channel')
|
||||
time.sleep(30)
|
||||
return
|
||||
|
||||
|
@ -162,6 +196,8 @@ class ChannelMonitor(threading.Thread):
|
|||
# update the last message id
|
||||
if len(messages) > 0:
|
||||
room.pnut_since = messages[0].id
|
||||
if 'max_id' in meta:
|
||||
self.update_marker(room.pnut_chan, meta.max_id)
|
||||
db.session.commit()
|
||||
|
||||
# empty the queue to the matrix room
|
||||
|
@ -191,7 +227,7 @@ class ChannelMonitor(threading.Thread):
|
|||
rooms = MatrixRoom2.query.all()
|
||||
for r in rooms:
|
||||
self.poll_channel(r)
|
||||
time.sleep(.5)
|
||||
time.sleep(5)
|
||||
time.sleep(3)
|
||||
logging.info("-- Stopping channel monitor --")
|
||||
|
||||
|
|
Loading…
Reference in a new issue