python-xmpp-bot

A simple python-based XMPP bot.
git clone https://git.sr.ht/~jbauer/python-xmpp-bot
Log | Files | Refs | README | LICENSE

commit a5f234735013d539f0dc70e06e3bb52238430991
parent 6b0314dadf5884dadad7959d7c56a60ba661a47b
Author: Jake Bauer <jbauer@paritybit.ca>
Date:   Sat, 28 Nov 2020 04:10:06 -0500

Vastly expand bot features and add config file

Bot now has a config file from which to pull settings, the python code
has been cleaned up and organized quite a bit, and various commands have
been added to the bot.

Diffstat:
Mbot.py | 330++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
Abotconfig.default.json | 9+++++++++
2 files changed, 278 insertions(+), 61 deletions(-)

diff --git a/bot.py b/bot.py @@ -1,18 +1,42 @@ #!/usr/bin/env python3 +# Copyright (C) 2020 Jake Bauer <jbauer@paritybit.ca> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +# For basic functionality import xmpp import sys -import os import signal +import json +import re + +# For fun and features +import random +import requests +from datetime import datetime +from pytz import timezone -jabberid = "bot@xmpp.paritybit.ca" -password = "secret" -receivers = ["jbauer@xmpp.paritybit.ca", "general@conference.xmpp.paritybit.ca"] +# Used for restarting the bot +restarting = False +invoker = "" def signal_handler(sig, frame): try: sys.stderr.write("\nSIGINT caught, exiting...\n") - client.disconnect() + bot.announce("Bot shutting down.") + bot.jabber.disconnect() sys.exit(0) except: sys.exit(0) @@ -20,88 +44,272 @@ def signal_handler(sig, frame): signal.signal(signal.SIGINT, signal_handler) class Bot: - - def __init__(self, jabber): - self.jabber = jabber + def __init__(self): + self.jabber = None + self.connection = None + self.jabberid = "" + self.password = "" + self.nick = "" + self.quotesfile = "" + self.rooms = [] + self.eightBallResponses = [] + self.admins = [] def register_handlers(self): self.jabber.RegisterHandler('message',self.xmpp_message) - # self.jabber.RegisterDefaultHandler(self.default_handler) - def default_handler(self, con, event): - print(event) - return + def send_message(self, recipient, message, typ): + m = xmpp.protocol.Message(to=recipient, body=message, typ=typ) + self.jabber.send(m) + + def check_admin(self, sender): + if sender in self.admins: + sys.stdout.write(sender + " is an admin.\n") + return True + sys.stdout.write(sender + " is NOT an admin.\n") + return False + + def join_room(self, room): + sys.stdout.write("Joining room: " + room + "\n") + self.jabber.sendPresence(jid=room + "/" + self.nick) + + def join_rooms(self): + for room in self.rooms: + self.join_room(room) + + def announce(self, message): + for room in self.rooms: + self.send_message(room, message, typ="groupchat") + + def load_config(self): + try: + with open("botconfig.json", "r") as file: + config = json.load(file) + self.rooms = config.get("rooms") or "" + self.quotesfile = config.get("quotesfile") or "" + self.nick = config.get("nick") or "defaultnick" + self.eightBallResponses = config.get("eightBallResponses") or [] + self.admins = config.get("admins") or [] + self.jabberid = config.get("jabberid") + self.password = config.get("password") + if not self.jabberid or not self.password: + sys.stderr.write("Missing values for jabberid and password, check your config file!\n") + sys.exit(1) + except Exception as e: + sys.stderr.write("Could not open config file: " + str(e) + "\n") + sys.exit(1) + + def save_config(self): + with open("botconfig.json", "w") as file: + config = { + "admins": self.admins, + "eightBallResponses": self.eightBallResponses, + "nick": self.nick, + "rooms": self.rooms + } + json.dump(config, file, indent=4) + + def xmpp_connect(self): + self.load_config() + jid = xmpp.protocol.JID(self.jabberid) + self.jabber = xmpp.Client(jid.getDomain(), debug = []) + connection = self.jabber.connect() + if not connection: + sys.stderr.write('Could not connect!\n') + return False + sys.stdout.write('Connected with %s\n'%connection) + self.connection = connection + auth = self.jabber.auth(jid.getNode(), self.password, resource=jid.getResource()) + if not auth: + sys.stderr.write('Could not authenticate!\n') + return False + sys.stdout.write('Authenticated using %s\n'%auth) + self.register_handlers() + self.jabber.sendInitPresence() + self.join_rooms() + return connection def xmpp_message(self, con, event): eventType = event.getType() messageDelayed = event.getTags("delay") or "" messageBody = event.getBody() or "" messageFrom = event.getFrom().getStripped() or "" - sys.stdout.write("==========\n") - sys.stdout.write(str(event) + '\n\n') - sys.stdout.write("DELAYED: " + str(messageDelayed == "") + '\n') - sys.stdout.write("TYPE: " + str(eventType) + '\n') - sys.stdout.write("MESG: " + str(messageBody) + '\n') - sys.stdout.write("FROM: " + str(messageFrom) + '\n') - sys.stdout.write("==========\n") + # Very useful for debug + # sys.stdout.write(str(event) + '\n\n') + # sys.stdout.write("==========\n") + # sys.stdout.write("DELAYED: " + str(messageDelayed == "") + '\n') + # sys.stdout.write("TYPE: " + str(eventType) + '\n') + # sys.stdout.write("MESG: " + str(messageBody) + '\n') + # sys.stdout.write("FROM: " + str(messageFrom) + '\n') + # sys.stdout.write("==========\n") if eventType in ["chat", "groupchat"] and not messageDelayed: if messageBody == ".tbhelp": sys.stdout.write("CAUGHT COMMAND: .tbhelp\n") self.command_help(messageFrom, eventType) - if messageBody == ".tbversion": + elif messageBody == ".tbversion": sys.stdout.write("CAUGHT COMMAND: .tbversion\n") self.command_version(messageFrom, eventType) - elif messageBody.find("invites you to the room") != -1: - sys.stdout.write("Invite to room received. Joining: " + messageFrom + "\n") - self.join_room(messageFrom + "/testbot") - return - - def join_room(self, room): - self.jabber.sendPresence(jid=room, typ=None) - return + elif messageBody == ".tbquote": + sys.stdout.write("CAUGHT COMMAND: .tbquote\n") + self.command_quote(messageFrom, eventType) + elif messageBody.startswith(".tbtime"): + sys.stdout.write("CAUGHT COMMAND: .tbtime\n") + self.command_time(messageFrom, eventType, messageBody) + elif messageBody.startswith(".tbdiceroll"): + sys.stdout.write("CAUGHT COMMAND: .tbdiceroll\n") + self.command_diceroll(messageFrom, eventType, messageBody) + elif messageBody.startswith(".tbweather"): + sys.stdout.write("CAUGHT COMMAND: .tbweather\n") + self.command_weather(messageFrom, eventType, messageBody) + elif messageBody.startswith(".tb8ball"): + sys.stdout.write("CAUGHT COMMAND: .tb8ball\n") + self.command_8ball(messageFrom, eventType, messageBody) + elif messageBody.startswith(".tbping"): + sys.stdout.write("CAUGHT COMMAND: .tbping\n") + self.command_ping(messageFrom, eventType) + if eventType in ["chat", None]: + if messageBody.find("invites you to the room") != -1: + sys.stdout.write("INVITED TO ROOM.") + self.join_room(messageFrom) + elif messageBody == ".tbrestart": + sys.stdout.write("CAUGHT COMMAND: .tbrestart\n") + sys.stdout.write("Checking if " + messageFrom + " is an admin...\n") + if self.check_admin(messageFrom): + self.command_restart(messageFrom, eventType) + elif messageBody == ".tbsaveconfig": + sys.stdout.write("CAUGHT COMMAND: .tbsaveconfig\n") + sys.stdout.write("Checking if " + messageFrom + " is an admin...\n") + if self.check_admin(messageFrom): + self.command_saveconfig(messageFrom, eventType) + elif messageBody.startswith(".tbaddadmin"): + sys.stdout.write("CAUGHT COMMAND: .tbaddadmin\n") + sys.stdout.write("Checking if " + messageFrom + " is an admin...\n") + if self.check_admin(messageFrom): + self.command_addadmin(messageFrom, eventType, messageBody) + elif messageBody.startswith(".tbremoveadmin"): + sys.stdout.write("CAUGHT COMMAND: .tbremoveadmin\n") + sys.stdout.write("Checking if " + messageFrom + " is an admin...\n") + if self.check_admin(messageFrom): + self.command_removeadmin(messageFrom, eventType, messageBody) def command_help(self, recipient, typ): - message = "Available commands: .tbhelp, .tbversion" + message = "Available commands: .tbhelp, .tbversion, .tbping, .tbquote, .tbtime, .tbdiceroll, .tbweather, .tb8ball" + if self.check_admin(recipient): + message = message + "\nAdmin commands: .tbrestart, .tbsaveconfig, .tbaddadmin, .tbremoveadmin" self.send_message(recipient, message, typ) - return def command_version(self, recipient, typ): - message = "XMPP Testbot version 0.1.0 by Jake Bauer" + message = "XMPP Python Bot (XPB) v0.3.0 by Jake Bauer." self.send_message(recipient, message, typ) - return - def send_message(self, recipient, message, typ): - m = xmpp.protocol.Message(to=recipient,body=message,typ=typ) - self.jabber.send(m) - return + def command_quote(self, recipient, typ): + try: + with open(self.quotesfile, "r") as f: + lines = f.read().splitlines() + selectedLine = random.choice(lines) + self.send_message(recipient, selectedLine, typ) + except Exception as e: + sys.stderr.write("Could not open quotes file: " + str(e)) + self.send_message(recipient, "Error, could not access quotes.", typ) - def xmpp_connect(self): - con = self.jabber.connect() - if not con: - sys.stderr.write('Could not connect!\n') - return False - sys.stderr.write('Connected with %s\n'%con) - auth=self.jabber.auth(jid.getNode(),password,resource=jid.getResource()) - if not auth: - sys.stderr.write('Could not authenticate!\n') - return False - sys.stderr.write('Authenticated using %s\n'%auth) - self.register_handlers() - return con + def command_time(self, recipient, typ, messageBody): + try: + message = datetime.now(timezone(messageBody.split(' ')[1])).strftime('%Y-%m-%d %H:%M:%S') + except: + message = "Please specify a valid timezone" + self.send_message(recipient, message, typ) + + def command_diceroll(self, recipient, typ, messageBody): + try: + message = str(random.randint(1, int(messageBody.split(' ')[1]))) + except: + message = "Please specify a maximum number (e.g. .tbdiceroll 20)." + self.send_message(recipient, message, typ) + + def command_weather(self, recipient, typ, messageBody): + try: + city = messageBody.split(' ')[1] + url = "https://wttr.in/" + city + "?m&format=3" + response = requests.get(url) + message = response.text.strip('\n\r') + except: + message = "Please specify a city (e.g. .tbweather Toronto)." + self.send_message(recipient, message, typ) + + def command_8ball(self, recipient, typ, messageBody): + try: + messageBody.split(' ')[1] + if not len(self.eightBallResponses): + message = "The bot has not been configured with 8-Ball responses." + else: + message = random.choice(self.eightBallResponses) + except: + message = "Please ask me a question.\n" + self.send_message(recipient, message, typ) + + def command_ping(self, recipient, typ): + message = "PONG" + self.send_message(recipient, message, typ) + + def command_restart(self, recipient, typ): + global restarting + global invoker + restarting = True + invoker = recipient + message = "Restarting..." + self.send_message(recipient, message, typ) + self.announce("Restarting...") + + def command_saveconfig(self, recipient, typ): + message = "Saving config..." + self.send_message(recipient, message, typ) + if self.save_config(): + message = "Config saved!" + else: + message = "Error saving config!" + self.send_message(recipient, message, typ) + + def command_addadmin(self, recipient, typ, messageBody): + try: + newAdmin = messageBody.split(' ')[1] + if not re.match(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", newAdmin): + raise InvalidJIDError + self.admins.append(newAdmin) + message = newAdmin + " has been added as an admin. Issue .tbsaveconfig to make this change permanent." + except Exception as e: + print(e) + message = "Please specify a valid Jabber ID to be added as an admin." + self.send_message(recipient, message, typ) -jid = xmpp.protocol.JID(jabberid) -client = xmpp.Client(jid.getDomain(), debug = []) + def command_removeadmin(self, recipient, typ, messageBody): + try: + oldAdmin = messageBody.split(' ')[1] + self.admins.remove(oldAdmin) + message = oldAdmin + " has been removed as an admin. Issue .tbsaveconfig to make this change permanent." + except: + message = "User was not an admin or no JID given." + self.send_message(recipient, message, typ) -bot = Bot(client) -if not bot.xmpp_connect(): - sys.stderr.write("Could not connect to server.\n") - sys.exit(1) +def start_bot(): + global bot + bot = Bot() + if not bot.xmpp_connect(): + sys.stderr.write("Could not connect to server.\n") + sys.exit(1) + bot.announce("Bot online!") -client.sendInitPresence() -online = 1 -while online: - if not client.isConnected(): - client.reconnectAndReauth() - client.Process(1) +start_bot() +while True: + if restarting: + bot.jabber.disconnect() + start_bot() + bot.send_message(invoker, "Bot restarted!", typ="chat") + restarting = False + try: + if not bot.jabber.isConnected(): + bot.jabber.reconnectAndReauth() + bot.jabber.Process(1) + except Exception as e: + sys.stderr.write(str(e) + "\n") diff --git a/botconfig.default.json b/botconfig.default.json @@ -0,0 +1,9 @@ +{ + "jabberid": "", + "password": "", + "quotesfile": "", + "admins": [], + "nick": "", + "rooms": [], + "eightBallResponses": [] +}