partybot/partybot/pnutbot.py
Morgan McMillian 5916dd1508
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
add support for a passive monitor url
2023-03-17 12:23:07 -07:00

432 lines
12 KiB
Python

import yaml
import requests
import pnutpy
import websocket
import threading
import logging
import time
import json
import random
import re
import argparse
import os
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from partybot.models import Base, Karma, Optout, Queue, Preferences, MdnpRequests
_startup = threading.Event()
_shutdown = threading.Event()
_connected = threading.Event()
_error = threading.Event()
logger = logging.getLogger()
config = {}
db_session = None
SNAP_DATA = os.environ.get('SNAP_DATA')
def subscribe(connection_id):
url = f"https://api.pnut.io/v1/channels/{config['CHANNEL']}/messages"
url += "?connection_id=" + connection_id
headers = {'Authorization': "Bearer " + config['ACCESS_TOKEN']}
r = requests.get(url, headers=headers)
if r.status_code == 200:
_connected.set()
if _startup.isSet():
send(config['CHANNEL'], "Partybot online and ready! \o/")
_startup.clear()
else:
logger.error(r)
def send(room, text):
pnutpy.api.create_message(room, data={'text': text})
def echo(room, text):
logger.debug(text)
send(room, text)
def help(room):
reply = "You can earn karma by making #MNDP requests!"
reply += " Your first request, and every 10 after earns you karma.\n\n"
reply += "You can upvote or downvote others by mentioning them with the following...\n\n"
reply += " to vote up: +1, ++, \U0001F44D \n"
reply += " to vote down: -1, --, \U0001F44E \n"
reply += "\n"
reply += "You can use the following commands\n\n"
reply += "!karma -- show the current karma ratings\n"
reply += "!optout -- remove yourself from karma ratings\n"
reply += "!optin -- add yourself to karma ratings\n"
reply += "!chimpnut -- toggle special ChimPnut alerts\n"
reply += "!botsnack -- give @partybot a special treat\n"
send(room, reply)
def botsnack(room):
replies = [
"Nom nom nom.",
"Ooooh",
"Yummi!",
"Delightful.",
"That makes me happy",
"How kind!",
"Sweet.",
"*burp*",
]
send(room, random.choice(replies))
def botdrink(room):
replies = [
"Hold my beer.",
"Ooooh",
"Delightful.",
"That makes me happy",
"How kind!",
"Sweet.",
"*burp*",
"Hmmm tasty",
"Hey, this glass is empty!"
]
send(room, random.choice(replies))
def optout(msg):
karma = Karma.query.filter(Karma.userid == msg.user.id).one_or_none()
if karma:
db_session.delete(karma)
counter = MdnpRequests.query.filter(MdnpRequests.userid == msg.user.id).one_or_none()
if counter:
db_session.delete(counter)
entry = Optout.query.filter(Optout.userid == msg.user.id).one_or_none()
if entry is None:
entry = Optout(userid=msg.user.id)
db_session.add(entry)
db_session.commit()
reply = "@" + msg.user.username
reply += " you have been removed from the karma table"
send(msg.channel_id, reply)
def optin(msg):
entry = Optout.query.filter(Optout.userid == msg.user.id).one_or_none()
if entry:
db_session.delete(entry)
reply = "@" + msg.user.username
reply += " you are able to earn karma"
send(msg.channel_id, reply)
def upvote(room, user, prefs):
karma = Karma.query.filter(Karma.userid == user.id).one_or_none()
if karma is None:
karma = Karma(userid=user.id, chanid=room, karma=1)
db_session.add(karma)
else:
karma.karma = karma.karma + 1
db_session.commit()
if prefs.chimpnut:
prefix = "/karma "
else:
prefix = ""
reply = prefix + "@" + user.username
reply += " now has " + str(karma.karma) + " karma in this channel"
send(room, reply)
def downvote(room, user, prefs):
karma = Karma.query.filter(Karma.userid == user.id).one_or_none()
if karma is None:
karma = Karma(userid=user.id, chanid=room, karma=-1)
db_session.add(karma)
else:
karma.karma = karma.karma - 1
db_session.commit()
if prefs.chimpnut:
prefix = "/karma "
else:
prefix = ""
reply = prefix + "@" + user.username
reply += " now has " + str(karma.karma) + " karma in this channel"
send(room, reply)
def karma(room):
reply = "Karma standings\n\n"
results = Karma.query.filter(Karma.chanid == room).order_by(Karma.karma.desc()).all()
for entry in results:
user, meta = pnutpy.api.get_user(entry.userid)
reply += user.username + ": " + str(entry.karma) + "\n"
send(room, reply)
def chimpnut(msg):
prefs = Preferences.query.filter(Preferences.userid == msg.user.id).one_or_none()
if prefs is None:
prefs = Preferences(userid=msg.user.id, chimpnut=False)
db_session.add(prefs)
if prefs.chimpnut:
prefs.chimpnut = False
reply = "@" + msg.user.username + " ChimPnut alert is now disabled"
else:
prefs.chimpnut = True
reply = "/Mac @" + msg.user.username + " ChimPnut alert is now enabled"
db_session.commit()
send(msg.channel_id, reply)
def on_command(msg):
room = msg.channel_id
args = msg.content.text.split(' ', 1)
if args[0] == "!help":
help(msg.channel_id)
elif args[0] == "!botsnack":
botsnack(msg.channel_id)
elif args[0] == "!botdrink":
botdrink(msg.channel_id)
elif args[0] == "!optout":
optout(msg)
elif args[0] == "!optin":
optin(msg)
elif args[0] == "!chimpnut":
chimpnut(msg)
elif args[0] == "!karma":
karma(msg.channel_id)
def on_vote(msg, matcher):
canidate = matcher.group(1)
vote = matcher.group(2)
if canidate == config['MNDPUSER']:
return
try:
pnutuser, meta = pnutpy.api.get_user("@" + canidate)
if msg.user.username == pnutuser.username:
logger.debug(pnutuser.username)
logger.debug(canidate)
reply = "@" + msg.user.username
reply += " silly human, your karma must be decided by others!"
send(msg.channel_id, reply)
return
except pnutpy.errors.PnutMissing:
reply = "@" + msg.user.username
reply += " I do not know who that is"
send(msg.channel_id, reply)
return
optout = Optout.query.filter(Optout.userid == pnutuser.id).one_or_none()
if optout:
reply = "@" + msg.user.username
reply += " user has chosen not to receive karma"
send(msg.channel_id, reply)
return
prefs = Preferences.query.filter(Preferences.userid == pnutuser.id).one_or_none()
if prefs is None:
prefs = Preferences(userid=pnutuser.id, chimpnut=True)
db_session.add(prefs)
db_session.commit()
logger.debug("-- VOTING " + vote)
upvotes = [
"++",
":thumbsup:",
":+1:",
"+1",
"\U0001F44D"
]
downvotes = [
"--",
":thumbsdown:",
":-1:",
"-1",
"\U0001F44E"
]
if vote in upvotes:
upvote(msg.channel_id, pnutuser, prefs)
elif vote in downvotes:
downvote(msg.channel_id, pnutuser, prefs)
def on_mention(msg):
# TODO: use for even more magic
return
def on_mndp(msg):
tags = [e.text for e in msg.content.entities.tags]
mentions = [e.text for e in msg.content.entities.mentions]
if "NowPlaying" not in tags:
return
for m in mentions:
addkarma = False
try:
pnutuser, meta = pnutpy.api.get_user("@" + m)
optout = Optout.query.filter(Optout.userid == pnutuser.id).one_or_none()
if optout:
continue
prefs = Preferences.query.filter(Preferences.userid == pnutuser.id).one_or_none()
if prefs is None:
prefs = Preferences(userid=pnutuser.id, chimpnut=True)
db_session.add(prefs)
db_session.commit()
entry = MdnpRequests.query.filter(MdnpRequests.userid == pnutuser.id).one_or_none()
if entry is None:
entry = MdnpRequests(userid=pnutuser.id, requests=0)
db_session.add(entry)
addkarma = True
entry.requests = entry.requests + 1
db_session.commit()
if entry.requests % 10 == 0 or addkarma:
upvote(msg.channel_id, pnutuser, prefs)
except pnutpy.errors.PnutMissing:
continue
def on_message(ws, message):
logger.debug("on_message: " + message)
msg = json.loads(message)
if not _connected.isSet() and 'connection_id' in msg['meta']:
if _startup.isSet():
send(config['CHANNEL'], "...connecting circuits...")
logger.debug("connection_id: " + msg['meta']['connection_id'])
subscribe(msg['meta']['connection_id'])
return
if 'data' in msg:
if "channel_type" in msg['meta'] and msg['meta']['channel_type'] == "io.pnut.core.chat":
for d_item in msg['data']:
pmsg = pnutpy.models.Message.from_response_data(d_item)
if 'is_deleted' in msg['meta']:
return
vpattern = r"([\w]+)\s?(\-\-|\+\+|\U0001F44D|\U0001F44E|\:thumbsup\:|\:\+1\:|\:thumbsdown\:|\:-1\:|\+1|-1)"
votes = re.search(vpattern, pmsg.content.text)
if pmsg.user.username == config['USERNAME']:
return
if pmsg.user.username == config['MNDPUSER']:
on_mndp(pmsg)
elif config['USERNAME'] in [e.text for e in pmsg.content.entities.mentions]:
on_mention(pmsg)
elif pmsg.content.text.startswith('!'):
on_command(pmsg)
elif votes:
on_vote(pmsg, votes)
def on_error(ws, error):
logger.error("on_error: !!! ERROR !!!")
logger.error(error)
_error.set()
def on_close(ws, status_code, msg):
# send(config['CHANNEL'], "...shutdown initiated...")
logger.debug("on_close: ### CLOSED ###")
logger.debug(status_code, msg)
_connected.clear()
def on_open(ws):
def run(*args):
if "MONITOR_URL" in config:
r = requests.get(config['MONITOR_URL'])
step = 0
while not _error.isSet():
qmsg = Queue.query.one_or_none()
if qmsg:
send(config['CHANNEL'], qmsg.msg)
db_session.delete(qmsg)
db_session.commit()
ws.send(".")
if "MONITOR_URL" in config and step > 5:
r = requests.get(config['MONITOR_URL'])
step = 0
elif "MONITOR_URL" in config:
step += 1
time.sleep(5)
logger.debug("*** terminate ***")
t = threading.Thread(target=run)
t.start()
def main():
global config
global db_session
a_parser = argparse.ArgumentParser()
a_parser.add_argument(
'-d', action='store_true', dest='debug',
help="debug logging"
)
a_parser.add_argument(
'-c', '--config', default="config.yaml",
help="configuration file"
)
a_parser.add_argument(
'-s', '--store', default="store.db",
help="database store url"
)
args = a_parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if SNAP_DATA is None:
filename = args.config
db_url = args.store
else:
filename = SNAP_DATA + '/config.yaml'
db_url = 'sqlite:///' + SNAP_DATA + '/store.db'
with open(filename, 'rb') as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader)
engine = create_engine(db_url)
db_session = scoped_session(sessionmaker(bind=engine))
Base.query = db_session.query_property()
Base.metadata.create_all(bind=engine)
if len(config['ACCESS_TOKEN']) <= 1:
logger.error("Error, invalid access token")
logger.error(f"You may not have updated " + filename)
exit(-1)
pnutpy.api.add_authorization_token(config['ACCESS_TOKEN'])
ws_url = "wss://stream.pnut.io/v1/user"
ws_url += "?access_token=" + config['ACCESS_TOKEN']
# setup the websocket connection
ws = websocket.WebSocketApp(ws_url, on_message=on_message,
on_error=on_error, on_close=on_close)
ws.on_open = on_open
r = True
_startup.set()
while r:
_error.clear()
r = ws.run_forever()
if __name__ == "__main__":
main()