Move database writes to main process in it's own thread
All checks were successful
dreamfall/clacksme/pipeline/head This commit looks good

This commit is contained in:
Morgan McMillian 2022-12-30 06:55:31 -08:00
parent afc9214211
commit ba12e2c5ba
2 changed files with 40 additions and 13 deletions

View file

@ -17,7 +17,7 @@ class MailboxNotifier(object):
def __init__(self, mailbox):
self.mailbox = mailbox
def start(self, log_queue):
def start(self, log_queue, event_queue):
imap_log = logging.getLogger('imapclient')
imap_log.level = logging.INFO
log_queue_handler = QueueHandler(log_queue)
@ -25,6 +25,7 @@ class MailboxNotifier(object):
self.log.level = logging.DEBUG
self.log.addHandler(log_queue_handler)
imap_log.addHandler(log_queue_handler)
self.event_queue = event_queue
self.log.info(
f"Starting up IMAP connection for {self.mailbox.user}")
@ -75,12 +76,12 @@ class MailboxNotifier(object):
for uid in messages:
if str(uid) not in event_ids:
self.log.info("New message")
new_event = Events(
user=self.mailbox.user,
service="imap",
event_id=uid)
new_event.save()
now = datetime.now()
self.event_queue.put({
"kind": "event",
"user": self.mailbox.user,
"mailbox": self.mailbox.imap_user,
"event_id": uid})
if self.mailbox.last_check is not None:
delta = now - self.mailbox.last_check
if delta.seconds > 60:
@ -90,8 +91,10 @@ class MailboxNotifier(object):
if notify:
self.log.debug("Notify")
self.mailbox.last_check = now
self.mailbox.save()
self.event_queue.put({
"kind": "notify",
"uid": self.mailbox.id,
"now": now})
self.on_notify()
def on_notify(self):
@ -136,7 +139,6 @@ def shutdown_handler(signal, frame):
def logger_thread(log_queue):
logging.debug("starting logger thread")
while True:
logging.debug(".")
entry = log_queue.get()
if entry is None:
logging.debug(".break.")
@ -144,6 +146,25 @@ def logger_thread(log_queue):
logger = logging.getLogger(entry.name)
logger.handle(entry)
def event_thread(event_queue):
logging.debug("starting event thread")
while not _shutdown.is_set():
entry = event_queue.get()
if entry is not None:
if entry['kind'] == "event":
new_event = Events(
user=entry['user'],
mailbox=entry['mailbox'],
event_id=entry['event_id'])
new_event.save()
elif entry['kind'] == "notify":
mailbox = Mailbox.get(id=entry['uid'])
mailbox.last_check = entry['now']
mailbox.save()
else:
logging.debug("..break..")
break
def main():
signal.signal(signal.SIGTERM, shutdown_handler)
@ -154,15 +175,16 @@ def main():
help="sqlite database filepath")
args = a_parser.parse_args()
db.init(args.store)
db.init(args.store, pragmas={'journal_mode': 'wal'})
create_tables()
log_queue = Queue()
event_queue = Queue()
processes = []
for mailbox in Mailbox.select():
imap_process = MailboxNotifier(mailbox)
p = Process(
target=imap_process.start, name=mailbox.user, args=(log_queue,))
target=imap_process.start, name=mailbox.user, args=(log_queue,event_queue,))
processes.append(p)
p.daemon = True
p.start()
@ -171,6 +193,8 @@ def main():
logging.debug("starting up main")
log_thread = threading.Thread(target=logger_thread, args=(log_queue,))
log_thread.start()
e_thread = threading.Thread(target=event_thread, args=(event_queue,))
e_thread.start()
logging.debug(processes)
try:
@ -186,7 +210,9 @@ def main():
_shutdown.set()
log_queue.put(None)
event_queue.put(None)
log_thread.join()
e_thread.join()
logging.debug("exiting main")
if __name__ == "__main__":

View file

@ -1,7 +1,8 @@
from peewee import *
from playhouse.migrate import *
from playhouse.sqlite_ext import SqliteExtDatabase
db = SqliteDatabase(None)
db = SqliteExtDatabase(None)
migrator = SqliteMigrator(db)
db_schema = "1"
@ -23,7 +24,7 @@ class Mailbox(BaseModel):
class Events(BaseModel):
user = CharField()
service = CharField()
mailbox = CharField()
event_id = CharField()
class Notifier(BaseModel):