paritybot

A fun IRC bot.
git clone https://git.sr.ht/~jbauer/paritybot
Log | Files | Refs | README | LICENSE

bot.py (12712B)


      1 # TODO: Make separate adminhelp command for admin-only commands
      2 
      3 import sys
      4 import signal
      5 import socket
      6 import ssl
      7 import json
      8 import time
      9 import base64
     10 import random
     11 import requests
     12 from threading import Timer
     13 from datetime import datetime
     14 from pytz import timezone
     15 
     16 
     17 # RepeatedTimer class from Stackoverflow question: https://stackoverflow.com/questions/3393612
     18 # Author of Question: https://stackoverflow.com/users/374797/john-howard
     19 # Answer: https://stackoverflow.com/a/13151299
     20 # Author of Answer: https://stackoverflow.com/users/624066
     21 class RepeatedTimer(object):
     22     def __init__(self, interval, function, *args, **kwargs):
     23         self._timer     = None
     24         self.interval   = interval
     25         self.function   = function
     26         self.args       = args
     27         self.kwargs     = kwargs
     28         self.is_running = False
     29         self.start()
     30 
     31     def _run(self):
     32         self.is_running = False
     33         self.start()
     34         self.function(*self.args, **self.kwargs)
     35 
     36     def start(self):
     37         if not self.is_running:
     38             self._timer = Timer(self.interval, self._run)
     39             self._timer.start()
     40             self.is_running = True
     41 
     42     def stop(self):
     43         self._timer.cancel()
     44         self.is_running = False
     45 
     46 
     47 class Bot:
     48     def __init__(self):
     49         self.VERSION = "v0.8.0"
     50         self.AUTHOR = "Jake Bauer (jbauer)"
     51         self.ircsock = None
     52         self.server = ""
     53         self.port = 0
     54         self.nick = ""
     55         self.password = ""
     56         self.channels = []
     57         self.admins = []
     58         self.eightBallResponses = []
     59         self.quotesfile = ""
     60         self.commandList = {
     61             "help": "+help: List all available commands",
     62             "version": "+version: Print information about the bot",
     63             "quit": "+quit: End this session",
     64             "quote": "+quote: Print a random quote",
     65             "time": "+time <timezone>: Print the time in a given timezone",
     66             "roll": "+roll <number>: Print a random number in a given range",
     67             "weather": "+weather <city>: Print the weather in a given city",
     68             "8ball": "+8ball: Correctly and accurately predict the future",
     69             "repeat": "+repeat <message>: Print a given message",
     70             "addadmin": "+addadmin <user>: Give a user admin privileges",
     71             "rmadmin": "+rmadmin <user>: Revoke a user's admin privileges",
     72             "admins": "+admins: List admins",
     73             "chgpass": "+chgpass <password>: Change the bot's password to a given string",
     74         }
     75         self.exception = ""
     76 
     77     def load_config(self):
     78         try:
     79             with open ("config.json", "r") as file:
     80                 config = json.load(file)
     81                 self.server = config.get("server")
     82                 self.port = config.get("port")
     83                 self.nick = config.get("nick")
     84                 self.password = config.get("password") or ""
     85                 self.channels = config.get("channels") or []
     86                 self.admins = config.get("admins") or []
     87                 self.eightBallResponses = config.get("eightBallResponses") or []
     88                 self.quotesfile = config.get("quotesfile") or ""
     89                 if not self.server or not self.port or not self.nick:
     90                     sys.stderr.write("Missing value for server, port, or nick, check your config file!\n")
     91                     sys.exit(1)
     92         except Exception as e:
     93             sys.stderr.write("Could not open config file: " + str(e) + "\n")
     94             sys.exit(1)
     95 
     96     def save_config(self):
     97         try:
     98             with open ("config.json", "w") as file:
     99                 config = {
    100                     "server": self.server,
    101                     "port": self.port,
    102                     "nick": self.nick,
    103                     "password": self.password,
    104                     "channels": self.channels,
    105                     "admins": self.admins,
    106                     "eightBallResponses": self.eightBallResponses,
    107                     "quotesfile": self.quotesfile
    108                 }
    109                 json.dump(config, file, indent=4)
    110         except Exception as e:
    111             sys.stderr.write("Could not open config file: " + str(e) + "\n")
    112 
    113     def listen(self):
    114         incoming = self.ircsock.recv(2048).decode('utf-8').strip('\n\r')
    115         print("\033[32m>>\033[0m " + incoming, flush=True)
    116         return incoming
    117 
    118     def waitfor(self, message):
    119         while True:
    120             incoming = self.listen()
    121             if message in incoming:
    122                 return incoming
    123 
    124     def get_channel(self, message):
    125         return message.split(' ')[2]
    126 
    127     def get_user(self, message):
    128         return message.split(' ')[0].split('!')[0][1:]
    129 
    130     def is_admin(self, user):
    131         if user in self.admins:
    132             return True
    133         return False
    134 
    135     def send_message(self, message):
    136         message = message + "\n"
    137         self.ircsock.sendall(message.encode('utf-8'))
    138         print("\033[31m<<\033[0m " + message, end='', flush=True)
    139 
    140     def send_privmsg(self, channel, message):
    141         self.send_message("PRIVMSG " + channel + " :" + message)
    142 
    143     def announce(self, message):
    144         for channel in self.channels:
    145             self.send_privmsg(channel, message)
    146 
    147     def handle_exception(self, incoming, exception):
    148         self.exception = exception
    149         self.send_privmsg(self.get_channel(incoming), "An unexpected error occurred. Run +showexception to see what went wrong.")
    150 
    151     def repeat(self, channel, message):
    152         self.send_privmsg(channel, message)
    153 
    154     def connect(self):
    155         sys.stderr.write("Connecting to: " + self.server + ":" + str(self.port))
    156         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    157         s.connect((self.server, self.port))
    158         self.ircsock = ssl.wrap_socket(s)
    159         self.send_message("USER " + self.nick + " 0 * " + self.nick)
    160         self.send_message("NICK "+ self.nick)
    161         self.sasl_auth() or self.nickserv_auth()
    162         for channel in self.channels:
    163             self.send_message("JOIN "+ channel)
    164 
    165     def sasl_auth(self):
    166         self.send_message("CAP REQ :sasl")
    167         incoming = self.waitfor("CAP")
    168         if "ACK :sasl" in incoming:
    169             self.send_message("AUTHENTICATE PLAIN")
    170             self.waitfor("+")
    171             creds = self.nick + "\0" + self.nick + "\0" + self.password
    172             creds = base64.b64encode(creds.encode('utf8')).decode('utf8')
    173             self.send_message("AUTHENTICATE " + creds)
    174             incoming = self.waitfor(":SASL authentication")
    175             if ("903" in incoming):
    176                 self.send_message("CAP END")
    177                 return True
    178             else:
    179                 return False
    180         else:
    181             return False
    182 
    183     def nickserv_auth(self):
    184         self.send_privmsg("NickServ", "IDENTIFY " + self.password)
    185         incoming = self.waitfor("NickServ")
    186         if ("You are now identified"):
    187             return True
    188         else:
    189             return False
    190 
    191     def disconnect(self):
    192         self.send_message("QUIT :Quitting...")
    193         self.ircsock.close()
    194 
    195     def quit (self, incoming):
    196         if self.is_admin(self.get_user(incoming)):
    197             self.send_privmsg(self.get_channel(incoming), "*sigh* alright then...")
    198             self.disconnect()
    199             self.save_config()
    200             return True
    201         else:
    202             self.send_privmsg(self.get_channel(incoming), "You don't have permission to use this command.")
    203             return False
    204 
    205     def command_help(self, incoming):
    206         try:
    207             self.send_privmsg(self.get_channel(incoming), self.commandList[incoming.split(' ')[4]])
    208         except:
    209             self.send_privmsg(self.get_channel(incoming), "Available commands: " + " ".join(("+" + x) for x in self.commandList.keys()))
    210             self.send_privmsg(self.get_channel(incoming), "+help <command> for more information")
    211 
    212     def command_version(self, incoming):
    213         self.send_privmsg(self.get_channel(incoming), self.nick + "version " + self.VERSION + " by " + self.AUTHOR)
    214 
    215     def command_quote(self, incoming):
    216         with open(self.quotesfile, "r") as f:
    217             lines = f.read().splitlines()
    218             selectedLine = random.choice(lines)
    219             self.send_privmsg(self.get_channel(incoming), selectedLine)
    220 
    221     def command_time(self, incoming):
    222         try:
    223             message = datetime.now(timezone(incoming.split(' ')[4])).strftime('%Y-%m-%d %H:%M:%S')
    224         except:
    225             message = "Please specify a valid timezone (e.g. +time America/Toronto)."
    226         self.send_privmsg(self.get_channel(incoming), message)
    227 
    228     def command_roll(self, incoming):
    229         try:
    230             message = str(random.randint(1, int(incoming.split(' ')[4])))
    231         except:
    232             message = "Please specify a maximum number (e.g. '+roll 20')."
    233         self.send_privmsg(self.get_channel(incoming), message)
    234 
    235     def command_weather(self, incoming):
    236         try:
    237             city = incoming.split(' ')[4]
    238             url = "https://wttr.in/" + city + "?m&format=3"
    239             response = requests.get(url)
    240             message = response.text.strip('\n\r')
    241         except:
    242             message = "Please specify a city (e.g. '+weather Toronto')."
    243         self.send_privmsg(self.get_channel(incoming), message)
    244 
    245     def command_eightball(self, incoming):
    246         try:
    247             incoming.split(' ')[4]
    248             message = random.choice(self.eightBallResponses)
    249         except:
    250             message = "Please ask me a question."
    251         self.send_privmsg(self.get_channel(incoming), message)
    252 
    253     def command_repeatsetup(self, incoming):
    254         if not self.is_admin(self.get_user(incoming)):
    255             message = "You don't have permission to use this command."
    256         else:
    257             try:
    258                 channel = self.get_channel(incoming)
    259                 interval = int(incoming.split(' ')[4])
    260                 message = ' '.join(incoming.split(' ')[5:])
    261                 rt = RepeatedTimer(interval, self.repeat, channel, message)
    262                 message = "Repeating message every " + interval + " seconds."
    263             except:
    264                 message = "Invalid syntax. Expected +repeat <seconds> <message>"
    265         self.send_privmsg(self.get_channel(incoming), message)
    266 
    267     def command_admins(self, incoming):
    268         self.send_privmsg(self.get_channel(incoming), "Admins: " + ', '.join(self.admins))
    269 
    270     def command_addadmin(self, incoming):
    271         try:
    272             if self.is_admin(self.get_user(incoming)):
    273                 candidate = incoming.split(' ')[4]
    274                 if self.is_admin(candidate):
    275                     message = candidate + " is already an admin."
    276                 else:
    277                     self.admins.append(candidate)
    278                     message = candidate + " granted admin privileges."
    279             else:
    280                 message = "You don't have permission to use this command."
    281         except:
    282             message = "Invalid syntax. Expected +addadmin <nick>"
    283         self.send_privmsg(self.get_channel(incoming), message)
    284 
    285     def command_rmadmin(self, incoming):
    286         try:
    287             if self.is_admin(self.get_user(incoming)):
    288                 candidate = incoming.split(' ')[4]
    289                 if not self.is_admin(candidate):
    290                     message = candidate + " is not an admin."
    291                 else:
    292                     self.admins.remove(candidate)
    293                     message = "Revoked " + candidate + "'s admin privileges."
    294             else:
    295                 message = "You don't have permission to use this command."
    296         except:
    297             message = "Invalid syntax. Expected +rmadmin <nick>"
    298         self.send_privmsg(self.get_channel(incoming), message)
    299 
    300     def command_showexception(self, incoming):
    301         if self.is_admin(self.get_user(incoming)):
    302             if not self.exception:
    303                 message = "No exceptions occurred... yet."
    304             else:
    305                 message = str(self.exception)
    306         else:
    307             message = "You don't have permission to use this command."
    308         self.send_privmsg(self.get_channel(incoming), message)
    309 
    310     def command_chgpass(self, incoming):
    311         try:
    312             password = incoming.split(' ')[4]
    313             if self.is_admin(self.get_user(incoming)):
    314                 self.send_privmsg("NickServ", "SET PASSWORD " + password)
    315                 response = self.waitfor("NickServ")
    316                 if "successfully" in response:
    317                     self.password = password
    318                     message = + "Password changed."
    319                 else:
    320                     message = + "Failed to change password."
    321         except:
    322             message = "Invalid syntax. Expected +chgpass <new_password>"
    323         self.send_privmsg(self.get_channel(incoming), message)