clacksme/clacksme/clacksme.py
Morgan McMillian c17c5b61c4
All checks were successful
thrrgilag/clacksme/pipeline/head This commit looks good
allow monitor to be skipped when not defined
closes issue #3
2024-01-28 06:20:47 -08:00

245 lines
7.8 KiB
Python

import argparse
import threading
import logging
import signal
import requests
from requests.exceptions import ConnectionError
from multiprocessing import Process, Event, Queue
from logging.handlers import QueueHandler
from imapclient import IMAPClient, exceptions
from datetime import datetime
from clacksme.model import *
_shutdown = Event()
class MailboxNotifier(object):
def __init__(self, mailbox):
self.mailbox = mailbox
self.last_check = mailbox.last_check
def start(self, log_queue, event_queue):
imap_log = logging.getLogger('imapclient')
imap_log.level = logging.INFO
log_queue_handler = QueueHandler(log_queue)
self.log = logging.getLogger(self.mailbox.user)
self.log.level = logging.DEBUG
self.log.addHandler(log_queue_handler)
imap_log.addHandler(log_queue_handler)
self.event_queue = event_queue
self.log.info(
f"Starting up IMAP connection for {self.mailbox.user}")
while not _shutdown.is_set():
try:
with IMAPClient(self.mailbox.imap_host) as self.imap:
self.imap.login(self.mailbox.imap_user,
self.mailbox.imap_pass)
self.imap.select_folder("INBOX",
readonly=self.mailbox.read_only)
self.log.info("Connected to INBOX")
messages = self.imap.search("UNSEEN")
if len(messages) > 0:
notify = self.check_events(messages)
self.imap_idle()
except Exception:
self.log.error("IMAP exception of some kind")
self.log.exception("IMAP")
def imap_idle(self):
while not _shutdown.is_set():
try:
self.imap.idle()
self.log.info("Waiting for new messages")
responses = self.imap.idle_check(timeout=30)
self.log.debug(responses)
for resp in responses:
if len(resp) == 2:
if resp[1] == b"RECENT":
self.imap.idle_done()
messages = self.imap.search("UNSEEN")
if len(messages) > 0:
notify = self.check_events(messages)
self.imap.idle()
self.log.info("IDLE done")
self.imap.idle_done()
except IMAPClient.AbortError:
self.log.error("IMAP abort, break")
break
def check_events(self, messages):
notify = False
events = Events.select().where(Events.user == self.mailbox.user)
event_ids = [event.event_id for event in events]
for uid in messages:
if str(uid) not in event_ids:
self.log.info("New message")
now = datetime.now()
self.event_queue.put({
"mailbox": self.mailbox,
"now": now,
"event_id": uid})
if self.last_check is not None:
delta = now - self.last_check
try:
backoff = System.get(System.key == "backoff")
except System.DoesNotExist:
backoff = System(key="backoff", value="60")
backoff.save()
if delta.seconds > int(backoff.value):
notify = True
else:
notify = True
if notify:
self.log.debug("Notify")
self.last_check = now
self.on_notify()
def on_notify(self):
self.log.info("Triggering notifications")
text = f"New messages have been received at {self.mailbox.imap_user}"
targets = Services.select().where(Services.user == self.mailbox.user)
for target in targets:
if target.service == "pushover" and target.enabled:
self.send_pushover(target.target, text)
if target.service == "mattermost" and target.enabled:
self.send_mattermost(target.target, text)
if target.service == "http_get" and target.enabled:
self.send_http_get(target.target)
def send_pushover(self, po_user, text):
notifier = Notifier.get(Notifier.service == "pushover")
url = "https://api.pushover.net/1/messages.json"
params = {
"token": notifier.secret,
"user": po_user,
"message": text
}
response = requests.post(url, data=params)
self.log.debug(response.status_code)
self.log.debug(response.text)
def send_mattermost(self, url, text):
params = {
"text": text
}
response = requests.post(url, json=params)
self.log.debug(response.status_code)
self.log.debug(response.text)
def send_http_get(self, url):
response = requests.get(url)
self.log.debug(response.status_code)
self.log.debug(response.text)
def shutdown_handler(signal, frame):
_shtudown.set()
def logger_thread(log_queue, url):
logging.debug("starting logger thread")
while True:
entry = log_queue.get()
if entry is None:
logging.debug(".break.")
break
logger = logging.getLogger(entry.name)
logger.handle(entry)
if url is not None:
try:
r = requests.get(url)
except ConnectionError:
pass
def event_thread(event_queue):
logging.debug("starting event thread")
while not _shutdown.is_set():
entry = event_queue.get()
if entry is not None:
mailbox = entry['mailbox']
new_event = Events(
user=mailbox.user,
mailbox=mailbox.imap_user,
event_id=entry['event_id'])
new_event.save()
mailbox.last_check = entry['now']
mailbox.save()
else:
logging.debug("..break..")
break
def main():
signal.signal(signal.SIGTERM, shutdown_handler)
a_parser = argparse.ArgumentParser(
description="IMAP notification suite")
a_parser.add_argument(
'-s', '--store', default="store.db",
help="sqlite database filepath")
args = a_parser.parse_args()
db.init(args.store, pragmas={'journal_mode': 'wal'})
create_tables()
monitor_url_check = System.get_or_none(System.key == "monitor_url")
if monitor_url_check is None:
monitor_url = None
else:
if len(monitor_url_check.value) < 8:
monitor_url = None
else:
monitor_url = monitor_url_check.value
log_queue = Queue()
event_queue = Queue()
processes = []
for mailbox in Mailbox.select():
imap_process = MailboxNotifier(mailbox)
p = Process(
target=imap_process.start, name=mailbox.user,
args=(log_queue,event_queue,))
processes.append(p)
p.daemon = True
p.start()
logging.basicConfig(level=logging.DEBUG)
logging.debug("starting up main")
log_thread = threading.Thread(target=logger_thread,
args=(log_queue,monitor_url,))
log_thread.start()
e_thread = threading.Thread(target=event_thread, args=(event_queue,))
e_thread.start()
logging.debug(processes)
try:
for p in processes:
p.join()
except KeyboardInterrupt:
logging.debug("CTRL+C")
logging.debug("Keyboard interrupt, shutting down from main")
for p in processes:
p.terminate()
logging.debug("..terminate..")
_shutdown.set()
log_queue.put(None)
event_queue.put(None)
log_thread.join()
e_thread.join()
logging.debug("exiting main")
if __name__ == "__main__":
main()