partybot/partybot/pnutbot.py
Morgan McMillian 4ae27d2a00 python package
2021-11-19 15:14:24 -08:00

399 lines
12 KiB
Python

import yaml
import requests
import pnutpy
import websocket
import threading
import logging
import time
import json
import random
import re
import argparse
from sqlalchemy import create_engine, and_
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from partybot.models import Base, Karma, Optout, Queue, Preferences, MdnpRequests
_startup = threading.Event()
_shutdown = threading.Event()
_connected = threading.Event()
_error = threading.Event()
def subscribe(connection_id):
url = f"https://api.pnut.io/v1/channels/{config['CHANNEL']}/messages"
url += "?connection_id=" + connection_id
headers = {'Authorization': "Bearer " + config['ACCESS_TOKEN']}
r = requests.get(url, headers=headers)
if r.status_code == 200:
_connected.set()
if _startup.isSet():
send(config['CHANNEL'], "Partybot online and ready! \o/")
_startup.clear()
else:
logger.error(r)
def send(room, text):
pnutpy.api.create_message(room, data={'text': text})
def echo(room, text):
logger.debug(text)
send(room, text)
def help(room):
reply = "You can earn karma by making #MNDP requests!"
reply += " Your first request, and every 10 after earns you karma.\n\n"
reply += "You can upvote or downvote others by mentioning them with the following...\n\n"
reply += " to vote up: +1, ++, \U0001F44D \n"
reply += " to vote down: -1, --, \U0001F44E \n"
reply += "\n"
reply += "You can use the following commands\n\n"
reply += "!karma -- show the current karma ratings\n"
reply += "!optout -- remove yourself from karma ratings\n"
reply += "!optin -- add yourself to karma ratings\n"
reply += "!chimpnut -- toggle special ChimPnut alerts\n"
reply += "!botsnack -- give @partybot a special treat\n"
send(room, reply)
def botsnack(room):
replies = [
"Nom nom nom.",
"Ooooh",
"Yummi!",
"Delightful.",
"That makes me happy",
"How kind!",
"Sweet.",
"*burp*",
]
send(room, random.choice(replies))
def botdrink(room):
replies = [
"Hold my beer.",
"Ooooh",
"Delightful.",
"That makes me happy",
"How kind!",
"Sweet.",
"*burp*",
"Ah.. Hiccup!"
]
send(room, random.choice(replies))
def optout(msg):
karma = Karma.query.filter(Karma.userid == msg.user.id).one_or_none()
if karma:
db_session.delete(karma)
counter = MdnpRequests.query.filter(MdnpRequests.userid == msg.user.id).one_or_none()
if counter:
db_session.delete(counter)
entry = Optout.query.filter(Optout.userid == msg.user.id).one_or_none()
if entry is None:
entry = Optout(userid=msg.user.id)
db_session.add(entry)
db_session.commit()
reply = "@" + msg.user.username
reply += " you have been removed from the karma table"
send(msg.channel_id, reply)
def optin(msg):
entry = Optout.query.filter(Optout.userid == msg.user.id).one_or_none()
if entry:
db_session.delete(entry)
reply = "@" + msg.user.username
reply += " you are able to earn karma"
send(msg.channel_id, reply)
def upvote(room, user, prefs):
karma = Karma.query.filter(Karma.userid == user.id).one_or_none()
if karma is None:
karma = Karma(userid=user.id, chanid=room, karma=1)
db_session.add(karma)
else:
karma.karma = karma.karma + 1
db_session.commit()
if prefs.chimpnut:
prefix = "/karma "
else:
prefix = ""
reply = prefix + "@" + user.username
reply += " now has " + str(karma.karma) + " karma in this channel"
send(room, reply)
def downvote(room, user, prefs):
karma = Karma.query.filter(Karma.userid == user.id).one_or_none()
if karma is None:
karma = Karma(userid=user.id, chanid=room, karma=-1)
db_session.add(karma)
else:
karma.karma = karma.karma - 1
db_session.commit()
if prefs.chimpnut:
prefix = "/karma "
else:
prefix = ""
reply = prefix + "@" + user.username
reply += " now has " + str(karma.karma) + " karma in this channel"
send(room, reply)
def karma(room):
reply = "Karma standings\n\n"
results = Karma.query.filter(Karma.chanid == room).order_by(Karma.karma.desc()).all()
for entry in results:
user, meta = pnutpy.api.get_user(entry.userid)
reply += user.username + ": " + str(entry.karma) + "\n"
send(room, reply)
def chimpnut(msg):
prefs = Preferences.query.filter(Preferences.userid == msg.user.id).one_or_none()
if prefs is None:
prefs = Preferences(userid=msg.user.id, chimpnut=False)
db_session.add(prefs)
if prefs.chimpnut:
prefs.chimpnut = False
reply = "@" + msg.user.username + " ChimPnut alert is now disabled"
else:
prefs.chimpnut = True
reply = "/Mac @" + msg.user.username + " ChimPnut alert is now enabled"
db_session.commit()
send(msg.channel_id, reply)
def on_command(msg):
room = msg.channel_id
args = msg.content.text.split(' ', 1)
if args[0] == "!help":
help(msg.channel_id)
elif args[0] == "!botsnack":
botsnack(msg.channel_id)
elif args[0] == "!botdrink":
botdrink(msg.channel_id)
elif args[0] == "!optout":
optout(msg)
elif args[0] == "!optin":
optin(msg)
elif args[0] == "!chimpnut":
chimpnut(msg)
elif args[0] == "!karma":
karma(msg.channel_id)
def on_vote(msg, matcher):
canidate = matcher.group(1)
vote = matcher.group(2)
if canidate == config['MNDPUSER']:
return
try:
pnutuser, meta = pnutpy.api.get_user("@" + canidate)
if msg.user.username == pnutuser.username:
logger.debug(pnutuser.username)
logger.debug(canidate)
reply = "@" + msg.user.username
reply += " silly human, your karma must be decided by others!"
send(msg.channel_id, reply)
return
except pnutpy.errors.PnutMissing:
reply = "@" + msg.user.username
reply += " I do not know who that is"
send(msg.channel_id, reply)
return
optout = Optout.query.filter(Optout.userid == pnutuser.id).one_or_none()
if optout:
reply = "@" + msg.user.username
reply += " user has chosen not to receive karma"
send(msg.channel_id, reply)
return
prefs = Preferences.query.filter(Preferences.userid == pnutuser.id).one_or_none()
if prefs is None:
prefs = Preferences(userid=pnutuser.id, chimpnut=True)
db_session.add(prefs)
db_session.commit()
logger.debug("-- VOTING " + vote)
upvotes = [
"++",
":thumbsup:",
":+1:",
"+1",
"\U0001F44D"
]
downvotes = [
"--",
":thumbsdown:",
":-1:",
"-1",
"\U0001F44E"
]
if vote in upvotes:
upvote(msg.channel_id, pnutuser, prefs)
elif vote in downvotes:
downvote(msg.channel_id, pnutuser, prefs)
def on_mention(msg):
# TODO: use for even more magic
return
def on_mndp(msg):
tags = [e.text for e in msg.content.entities.tags]
mentions = [e.text for e in msg.content.entities.mentions]
if "NowPlaying" not in tags:
return
for m in mentions:
addkarma = False
try:
pnutuser, meta = pnutpy.api.get_user("@" + m)
optout = Optout.query.filter(Optout.userid == pnutuser.id).one_or_none()
if optout:
continue
prefs = Preferences.query.filter(Preferences.userid == pnutuser.id).one_or_none()
if prefs is None:
prefs = Preferences(userid=pnutuser.id, chimpnut=True)
db_session.add(prefs)
db_session.commit()
entry = MdnpRequests.query.filter(MdnpRequests.userid == pnutuser.id).one_or_none()
if entry is None:
entry = MdnpRequests(userid=pnutuser.id, requests=0)
db_session.add(entry)
addkarma = True
entry.requests = entry.requests + 1
db_session.commit()
if entry.requests % 10 == 0 or addkarma:
upvote(msg.channel_id, pnutuser, prefs)
except pnutpy.errors.PnutMissing:
continue
def on_message(ws, message):
logger.debug("on_message: " + message)
msg = json.loads(message)
if not _connected.isSet() and 'connection_id' in msg['meta']:
if _startup.isSet():
send(config['CHANNEL'], "...connecting circuits...")
logger.debug("connection_id: " + msg['meta']['connection_id'])
subscribe(msg['meta']['connection_id'])
return
if 'data' in msg:
if "channel_type" in msg['meta'] and msg['meta']['channel_type'] == "io.pnut.core.chat":
for d_item in msg['data']:
pmsg = pnutpy.models.Message.from_response_data(d_item)
if 'is_deleted' in msg['meta']:
return
vpattern = r"([\w]+)\s?(\-\-|\+\+|\U0001F44D|\U0001F44E|\:thumbsup\:|\:\+1\:|\:thumbsdown\:|\:-1\:|\+1|-1)"
votes = re.search(vpattern, pmsg.content.text)
if pmsg.user.username == config['USERNAME']:
return
if pmsg.user.username == config['MNDPUSER']:
on_mndp(pmsg)
elif config['USERNAME'] in [e.text for e in pmsg.content.entities.mentions]:
on_mention(pmsg)
elif pmsg.content.text.startswith('!'):
on_command(pmsg)
elif votes:
on_vote(pmsg, votes)
def on_error(ws, error):
logger.error("on_error: !!! ERROR !!!")
logger.error(error)
_error.set()
def on_close(ws, status_code, msg):
# send(config['CHANNEL'], "...shutdown initiated...")
logger.debug("on_close: ### CLOSED ###")
logger.debug(status_code, msg)
_connected.clear()
def on_open(ws):
def run(*args):
while not _error.isSet():
qmsg = Queue.query.one_or_none()
if qmsg:
send(config['CHANNEL'], qmsg.msg)
db_session.delete(qmsg)
db_session.commit()
ws.send(".")
time.sleep(3)
logger.debug("*** terminate ***")
t = threading.Thread(target=run)
t.start()
def main():
logger = logging.getLogger()
a_parser = argparse.ArgumentParser()
a_parser.add_argument(
'-d', action='store_true', dest='debug',
help="debug logging"
)
a_parser.add_argument(
'-c', '--config', default="config.yaml",
help="configuration file"
)
args = a_parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
with open(args.config, 'rb') as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader)
engine = create_engine(config['SERVICE_DB'])
db_session = scoped_session(sessionmaker(bind=engine))
Base.query = db_session.query_property()
Base.metadata.create_all(bind=engine)
pnutpy.api.add_authorization_token(config['ACCESS_TOKEN'])
ws_url = "wss://stream.pnut.io/v1/user"
ws_url += "?access_token=" + config['ACCESS_TOKEN']
# setup the websocket connection
ws = websocket.WebSocketApp(ws_url, on_message=on_message,
on_error=on_error, on_close=on_close)
ws.on_open = on_open
r = True
_startup.set()
while r:
_error.clear()
r = ws.run_forever()
if __name__ == "__main__":
main()