- improved thread handling

- automatic reconnect when closed remotely
issue #40
This commit is contained in:
Morgan McMillian 2019-02-02 11:50:56 -08:00
parent be10ec2346
commit febb765f81

View file

@ -18,6 +18,7 @@ from appservice import app
logger = logging.getLogger()
_shutdown = threading.Event()
_reconnect = threading.Event()
def new_message(msg):
logger.debug("channel: " + msg.channel_id)
@ -263,33 +264,50 @@ def on_error(ws, error):
def on_close(ws):
logger.debug("on_close: ### CLOSED ###")
if not _shutdown.set():
time.sleep(10)
t.start()
else:
time.sleep(2)
def on_open(ws):
def run(*args):
while not _shutdown.isSet():
while not _shutdown.isSet() and not _reconnect.isSet():
time.sleep(3)
try:
ws.send(".")
except websocket._exceptions.WebSocketConnectionClosedException:
logger.debug('websocket closed exception caught...')
_reconnect.set()
time.sleep(1)
ws.close()
logger.debug("*** terminate ***")
logger.debug("*** terminate thread ***")
t = threading.Thread(target=run)
t.start()
def wsthreader(threadfunc):
def wrapper():
while not _shutdown.isSet():
_reconnect.clear()
logger.debug('threadfunc start...')
running = threadfunc()
logger.debug('threadfunc end...')
if running:
time.sleep(5)
else:
_shutdown.set()
logger.debug('*** thread stopped ***')
return wrapper
if __name__ == '__main__':
# websocket.enableTrace(True)
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
with open("config.yaml", "rb") as config_file:
config = yaml.load(config_file)
ws_url = 'wss://stream.pnut.io/v0/app?access_token='
# ws_url = 'wss://stream.pnut.io/v0/app?access_token='
ws_url = 'ws://192.168.1.200:9001/?'
ws_url += config['PNUT_APPTOKEN'] + '&key=' + config['PNUT_APPKEY']
ws_url += '&include_raw=1'
matrix_url = config['MATRIX_HOST'] + '/_matrix/client/r0'
@ -301,9 +319,8 @@ if __name__ == '__main__':
ws = websocket.WebSocketApp(ws_url, on_message=on_message,
on_error=on_error, on_close=on_close)
ws.on_open = on_open
t = threading.Thread(target=ws.run_forever)
t.daemon = True
t.start()
wst = threading.Thread(target=wsthreader(ws.run_forever))
wst.start()
# setup the matrix app service
if config['MATRIX_ADMIN_ROOM']:
@ -312,6 +329,7 @@ if __name__ == '__main__':
app.config.update(config)
app.run(port=config['LISTEN_PORT'])
_shutdown.set()
logger.info('!! shutdown initiated !!')
_shutdown.set()
ws.close()
time.sleep(2)