Morgan McMillian
b7e016ed1b
All checks were successful
dreamfall/clacksme/pipeline/head This commit looks good
220 lines
7.1 KiB
Python
220 lines
7.1 KiB
Python
import argparse
|
|
import threading
|
|
import logging
|
|
import signal
|
|
import requests
|
|
|
|
from multiprocessing import Process, Event, Queue
|
|
from logging.handlers import QueueHandler
|
|
from imapclient import IMAPClient, exceptions
|
|
from datetime import datetime
|
|
from clacksme.model import *
|
|
|
|
_shutdown = Event()
|
|
|
|
class MailboxNotifier(object):
|
|
|
|
def __init__(self, mailbox):
|
|
self.mailbox = mailbox
|
|
self.last_check = mailbox.last_check
|
|
|
|
def start(self, log_queue, event_queue):
|
|
imap_log = logging.getLogger('imapclient')
|
|
imap_log.level = logging.INFO
|
|
log_queue_handler = QueueHandler(log_queue)
|
|
self.log = logging.getLogger(self.mailbox.user)
|
|
self.log.level = logging.DEBUG
|
|
self.log.addHandler(log_queue_handler)
|
|
imap_log.addHandler(log_queue_handler)
|
|
self.event_queue = event_queue
|
|
|
|
self.log.info(
|
|
f"Starting up IMAP connection for {self.mailbox.user}")
|
|
while not _shutdown.is_set():
|
|
try:
|
|
with IMAPClient(self.mailbox.imap_host) as self.imap:
|
|
self.imap.login(self.mailbox.imap_user,
|
|
self.mailbox.imap_pass)
|
|
self.imap.select_folder("INBOX", readonly=True)
|
|
self.log.info("Connected to INBOX")
|
|
messages = self.imap.search("UNSEEN")
|
|
if len(messages) > 0:
|
|
notify = self.check_events(messages)
|
|
|
|
self.imap_idle()
|
|
|
|
except Exception:
|
|
self.log.error("IMAP exception of some kind")
|
|
self.log.exception("IMAP")
|
|
|
|
def imap_idle(self):
|
|
while not _shutdown.is_set():
|
|
try:
|
|
self.imap.idle()
|
|
self.log.info("Waiting for new messages")
|
|
responses = self.imap.idle_check(timeout=30)
|
|
self.log.debug(responses)
|
|
for resp in responses:
|
|
if len(resp) == 2:
|
|
if resp[1] == b"RECENT":
|
|
self.imap.idle_done()
|
|
messages = self.imap.search("UNSEEN")
|
|
if len(messages) > 0:
|
|
notify = self.check_events(messages)
|
|
self.imap.idle()
|
|
|
|
self.log.info("IDLE done")
|
|
self.imap.idle_done()
|
|
|
|
except IMAPClient.AbortError:
|
|
self.log.error("IMAP abort, break")
|
|
break
|
|
|
|
def check_events(self, messages):
|
|
notify = False
|
|
events = Events.select().where(Events.user == self.mailbox.user)
|
|
event_ids = [event.event_id for event in events]
|
|
for uid in messages:
|
|
if str(uid) not in event_ids:
|
|
self.log.info("New message")
|
|
now = datetime.now()
|
|
self.event_queue.put({
|
|
"mailbox": self.mailbox,
|
|
"now": now,
|
|
"event_id": uid})
|
|
if self.last_check is not None:
|
|
delta = now - self.last_check
|
|
|
|
try:
|
|
backoff = System.get(System.key == "backoff")
|
|
except System.DoesNotExist:
|
|
backoff = System(key="backoff", value="60")
|
|
backoff.save()
|
|
|
|
if delta.seconds > int(backoff.value):
|
|
notify = True
|
|
else:
|
|
notify = True
|
|
|
|
if notify:
|
|
self.log.debug("Notify")
|
|
self.last_check = now
|
|
self.on_notify()
|
|
|
|
def on_notify(self):
|
|
self.log.info("Triggering notifications")
|
|
text = f"New messages have been received at {self.mailbox.imap_user}"
|
|
targets = Services.select().where(Services.user == self.mailbox.user)
|
|
for target in targets:
|
|
|
|
if target.service == "pushover" and target.enabled:
|
|
self.send_pushover(target.target, text)
|
|
|
|
if target.service == "mattermost" and target.enabled:
|
|
self.send_mattermost(target.target, text)
|
|
|
|
def send_pushover(self, po_user, text):
|
|
notifier = Notifier.get(Notifier.service == "pushover")
|
|
url = "https://api.pushover.net/1/messages.json"
|
|
params = {
|
|
"token": notifier.secret,
|
|
"user": po_user,
|
|
"message": text
|
|
}
|
|
response = requests.post(url, data=params)
|
|
self.log.debug(response.status_code)
|
|
self.log.debug(response.text)
|
|
|
|
def send_mattermost(self, url, text):
|
|
params = {
|
|
"text": text
|
|
}
|
|
response = requests.post(url, json=params)
|
|
self.log.debug(response.status_code)
|
|
self.log.debug(response.text)
|
|
|
|
|
|
def shutdown_handler(signal, frame):
|
|
_shtudown.set()
|
|
|
|
def logger_thread(log_queue):
|
|
logging.debug("starting logger thread")
|
|
while True:
|
|
entry = log_queue.get()
|
|
if entry is None:
|
|
logging.debug(".break.")
|
|
break
|
|
logger = logging.getLogger(entry.name)
|
|
logger.handle(entry)
|
|
|
|
def event_thread(event_queue):
|
|
logging.debug("starting event thread")
|
|
while not _shutdown.is_set():
|
|
entry = event_queue.get()
|
|
if entry is not None:
|
|
mailbox = entry['mailbox']
|
|
new_event = Events(
|
|
user=mailbox.user,
|
|
mailbox=mailbox.imap_user,
|
|
event_id=entry['event_id'])
|
|
new_event.save()
|
|
mailbox.last_check = entry['now']
|
|
mailbox.save()
|
|
else:
|
|
logging.debug("..break..")
|
|
break
|
|
|
|
def main():
|
|
signal.signal(signal.SIGTERM, shutdown_handler)
|
|
|
|
a_parser = argparse.ArgumentParser(
|
|
description="IMAP notification suite")
|
|
a_parser.add_argument(
|
|
'-s', '--store', default="store.db",
|
|
help="sqlite database filepath")
|
|
args = a_parser.parse_args()
|
|
|
|
db.init(args.store, pragmas={'journal_mode': 'wal'})
|
|
create_tables()
|
|
|
|
log_queue = Queue()
|
|
event_queue = Queue()
|
|
processes = []
|
|
for mailbox in Mailbox.select():
|
|
imap_process = MailboxNotifier(mailbox)
|
|
p = Process(
|
|
target=imap_process.start, name=mailbox.user,
|
|
args=(log_queue,event_queue,))
|
|
processes.append(p)
|
|
p.daemon = True
|
|
p.start()
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logging.debug("starting up main")
|
|
log_thread = threading.Thread(target=logger_thread, args=(log_queue,))
|
|
log_thread.start()
|
|
e_thread = threading.Thread(target=event_thread, args=(event_queue,))
|
|
e_thread.start()
|
|
|
|
logging.debug(processes)
|
|
try:
|
|
for p in processes:
|
|
p.join()
|
|
|
|
except KeyboardInterrupt:
|
|
logging.debug("CTRL+C")
|
|
logging.debug("Keyboard interrupt, shutting down from main")
|
|
for p in processes:
|
|
p.terminate()
|
|
logging.debug("..terminate..")
|
|
_shutdown.set()
|
|
|
|
log_queue.put(None)
|
|
event_queue.put(None)
|
|
log_thread.join()
|
|
e_thread.join()
|
|
logging.debug("exiting main")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|