reinitiate connection on websocket error resolves issue #15

This commit is contained in:
Morgan McMillian 2020-01-27 22:09:39 -08:00
parent 6092607207
commit b77e105d5c

View file

@ -13,8 +13,10 @@ from database import db_session, init_db
from sqlalchemy import and_ from sqlalchemy import and_
from models import Karma, Optout, Queue, Preferences, MdnpRequests from models import Karma, Optout, Queue, Preferences, MdnpRequests
_startup = threading.Event()
_shutdown = threading.Event() _shutdown = threading.Event()
_connected = threading.Event() _connected = threading.Event()
_error = threading.Event()
def subscribe(connection_id): def subscribe(connection_id):
url = f"https://api.pnut.io/v0/channels/{config['CHANNEL']}/messages" url = f"https://api.pnut.io/v0/channels/{config['CHANNEL']}/messages"
@ -23,7 +25,9 @@ def subscribe(connection_id):
r = requests.get(url, headers=headers) r = requests.get(url, headers=headers)
if r.status_code == 200: if r.status_code == 200:
_connected.set() _connected.set()
send(config['CHANNEL'], "Partybot online and ready! \o/") if _startup.isSet():
send(config['CHANNEL'], "Partybot online and ready! \o/")
_startup.clear()
else: else:
logger.error(r) logger.error(r)
@ -282,7 +286,8 @@ def on_message(ws, message):
msg = json.loads(message) msg = json.loads(message)
if not _connected.isSet() and 'connection_id' in msg['meta']: if not _connected.isSet() and 'connection_id' in msg['meta']:
send(config['CHANNEL'], "...connecting circuits...") if _startup.isSet():
send(config['CHANNEL'], "...connecting circuits...")
logger.debug("connection_id: " + msg['meta']['connection_id']) logger.debug("connection_id: " + msg['meta']['connection_id'])
subscribe(msg['meta']['connection_id']) subscribe(msg['meta']['connection_id'])
return return
@ -317,26 +322,24 @@ def on_message(ws, message):
def on_error(ws, error): def on_error(ws, error):
logger.error("on_error: !!! ERROR !!!") logger.error("on_error: !!! ERROR !!!")
logger.error(error) logger.error(error)
# _shutdown.set() _error.set()
def on_close(ws): def on_close(ws):
send(config['CHANNEL'], "...shutdown initiated...") # send(config['CHANNEL'], "...shutdown initiated...")
logger.debug("on_close: ### CLOSED ###") logger.debug("on_close: ### CLOSED ###")
# _shutdown.set() _connected.clear()
def on_open(ws): def on_open(ws):
def run(*args): def run(*args):
while not _shutdown.isSet(): while not _error.isSet():
qmsg = Queue.query.one_or_none() qmsg = Queue.query.one_or_none()
if qmsg: if qmsg:
send(config['CHANNEL'], qmsg.msg) send(config['CHANNEL'], qmsg.msg)
db_session.delete(qmsg) db_session.delete(qmsg)
db_session.commit() db_session.commit()
time.sleep(3)
ws.send(".") ws.send(".")
time.sleep(1) time.sleep(3)
ws.close()
logger.debug("*** terminate ***") logger.debug("*** terminate ***")
t = threading.Thread(target=run) t = threading.Thread(target=run)
@ -361,4 +364,8 @@ if __name__ == "__main__":
ws = websocket.WebSocketApp(ws_url, on_message=on_message, ws = websocket.WebSocketApp(ws_url, on_message=on_message,
on_error=on_error, on_close=on_close) on_error=on_error, on_close=on_close)
ws.on_open = on_open ws.on_open = on_open
ws.run_forever() r = True
_startup.set()
while r:
_error.clear()
r = ws.run_forever()