From febb765f81ccf2c1c088de52bac9b073cab5dc24 Mon Sep 17 00:00:00 2001 From: Morgan McMillian Date: Sat, 2 Feb 2019 11:50:56 -0800 Subject: [PATCH] - improved thread handling - automatic reconnect when closed remotely issue #40 --- pnut-matrix.py | 50 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/pnut-matrix.py b/pnut-matrix.py index 128d6c4..a631068 100644 --- a/pnut-matrix.py +++ b/pnut-matrix.py @@ -18,6 +18,7 @@ from appservice import app logger = logging.getLogger() _shutdown = threading.Event() +_reconnect = threading.Event() def new_message(msg): logger.debug("channel: " + msg.channel_id) @@ -263,33 +264,50 @@ def on_error(ws, error): def on_close(ws): logger.debug("on_close: ### CLOSED ###") - if not _shutdown.set(): - time.sleep(10) - t.start() - else: - time.sleep(2) def on_open(ws): def run(*args): - while not _shutdown.isSet(): + while not _shutdown.isSet() and not _reconnect.isSet(): time.sleep(3) - ws.send(".") + try: + ws.send(".") + except websocket._exceptions.WebSocketConnectionClosedException: + logger.debug('websocket closed exception caught...') + _reconnect.set() + time.sleep(1) - ws.close() - logger.debug("*** terminate ***") - + logger.debug("*** terminate thread ***") + t = threading.Thread(target=run) t.start() +def wsthreader(threadfunc): + + def wrapper(): + + while not _shutdown.isSet(): + _reconnect.clear() + logger.debug('threadfunc start...') + running = threadfunc() + logger.debug('threadfunc end...') + if running: + time.sleep(5) + else: + _shutdown.set() + logger.debug('*** thread stopped ***') + + return wrapper + if __name__ == '__main__': # websocket.enableTrace(True) - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) with open("config.yaml", "rb") as config_file: config = yaml.load(config_file) - ws_url = 'wss://stream.pnut.io/v0/app?access_token=' + # ws_url = 'wss://stream.pnut.io/v0/app?access_token=' + ws_url = 'ws://192.168.1.200:9001/?' ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY'] ws_url += '&include_raw=1' matrix_url = config['MATRIX_HOST'] + '/_matrix/client/r0' @@ -301,9 +319,8 @@ if __name__ == '__main__': ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open - t = threading.Thread(target=ws.run_forever) - t.daemon = True - t.start() + wst = threading.Thread(target=wsthreader(ws.run_forever)) + wst.start() # setup the matrix app service if config['MATRIX_ADMIN_ROOM']: @@ -312,6 +329,7 @@ if __name__ == '__main__': app.config.update(config) app.run(port=config['LISTEN_PORT']) - _shutdown.set() logger.info('!! shutdown initiated !!') + _shutdown.set() + ws.close() time.sleep(2)