From 339dfc30ef23bfc6b28125d807d1755eaa858270 Mon Sep 17 00:00:00 2001 From: klemek Date: Mon, 20 Jul 2020 23:26:42 +0200 Subject: [PATCH] initial commit --- .gitignore | 6 ++ bot.py | 241 +++++++++++++++++++++++++++++++++++++++++++++++ main.py | 8 ++ requirements.txt | 2 + 4 files changed, 257 insertions(+) create mode 100644 .gitignore create mode 100644 bot.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..738e240 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea +venv +__pycache__ +.env +error_* +*.log diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..58a0af3 --- /dev/null +++ b/bot.py @@ -0,0 +1,241 @@ +from typing import List, Callable, Coroutine, Tuple, Any +import time +import logging +import traceback +import discord +import re +import asyncio +import random +import os +from datetime import datetime +from dotenv import load_dotenv + + +CommandFunction = Callable[[discord.Client, discord.Message, Tuple[str]], Coroutine[Any, Any, None]] + + +def debug(message: discord.Message, txt: str): + logging.info(f"{message.guild} > #{message.channel}: {txt}") + + +async def delete_message(message: discord.Message) -> bool: + try: + await message.delete() + return True + except discord.Forbidden: + pass + except discord.NotFound: + pass + return False + + +def message_id(message: discord.Message) -> str: + is_direct = message.channel.type == discord.ChannelType.private + if not is_direct: + return f'{message.guild.id}/{message.channel.id}/{message.author.id}' + else: + return message.author.id + + +def sanitize_input(src: str) -> str: + return re.sub(r'[^A-Za-z0-9 _]', "", src.lower().strip()) + + +args_regex = re.compile('"([^"]*)"|\'([^\']*)\'|([^ ]+)') + + +def parse_arguments(src: str) -> List[str]: + def get_found_match(m: list) -> str: + f = [g for g in m if len(g) > 0] + if len(f) > 0: + return f[0] + return "" + + return [get_found_match(m) for m in args_regex.findall(src)] + + +class Command(object): + def __init__(self, regex: str, compute: CommandFunction, help_short: str, help_long: str): + self.regex = regex + self.compute = compute + self.help_short = help_short + self.help_long = help_long + + +class Bot(object): + def __init__(self, app_name: str, version: str, *, alias: str = None): + # constants + self.token_env_var = "DISCORD_TOKEN" + self.remove_mentions = True # remove mentions from arguments + self.alias = alias # can call bot with {alias}{command_name} + self.any_mention = False # bot mention can be anywhere + self.log_calls = False + self.guild_logs_file = "guilds.log" + self.enforce_write_permission = True + self.lower_command_names = True + self.game_change_delay = 10 + self.error_restart_delay = 2 + # config vars + self.app_name = app_name + self.version = version + # future vars + self.__token = None + self.__t0 = None + self.__last_error = None + # init + self.__commands = [] + self.games = [f"v{version}", + lambda:f"{len(self.client.guilds)} guilds"] + if self.alias is not None: + self.games += [f"{self.alias}help"] + self.client = discord.Client() + self.__register_events() + self.__register_commands() + + def __register_events(self): + self.on_ready = self.client.event(self.on_ready) + self.on_message = self.client.event(self.on_message) + self.on_guild_join = self.client.event(self.on_guild_join) + self.on_guild_remove = self.client.event(self.on_guild_remove) + + def __register_commands(self): + # register default commands + tmp_alias = '' if self.alias is None else self.alias + self.register_command("(help|h)", self.help, "help: show this help", + f"```\n" + f"* {tmp_alias}help\n" + f"\tShows the list of commands.\n" + f"* {tmp_alias}help [command]\n" + f"\tShows help about a specific command.\n" + f"```") + self.register_command("(info|about)", self.info, "info: show description", + f"```\n" + f"* {tmp_alias}info:\n" + f"\tShows this bot's status.\n" + f"```") + + def __generate_game(self) -> str: + game = random.choice(self.games) + if callable(game): + return game() + else: + return game + + async def info(self, _client: discord.client, message: discord.Message): + await message.channel.send(f"```\n" + f"{self.app_name} v{self.version}\n" + f"* Started at {self.__t0:%Y-%m-%d %H:%M}\n" + f"* Connected to {len(self.client.guilds)} guilds\n" + f"```") + + async def help(self, _client: discord.client, message: discord.Message, *args: str): + if len(args) <= 1: + tmp_alias = '' if self.alias is None else self.alias + await message.channel.send( + f"```\n" + f"List of available commands:\n" + + "".join([f"* {tmp_alias}{command.help_short}\n" for command in self.__commands]) + + f"```") + else: + for command in self.__commands: + if re.match(command.regex, args[1].lower() if self.lower_command_names else args[1]): + await message.channel.send(command.help_long) + return + await message.channel.send(f"Command `{sanitize_input(args[1])}` not found") + + async def on_ready(self): + # Change status + logging.info(f"{self.client.user} (v{self.version}) has connected to {len(self.client.guilds)} Discord guilds") + if self.guild_logs_file is not None and not os.path.exists(self.guild_logs_file): + for guild in self.client.guilds: + await self.on_guild_join(guild) + while True: + await self.client.change_presence( + activity=discord.Game(self.__generate_game()), + status=discord.Status.online + ) + await asyncio.sleep(self.game_change_delay) + + async def on_message(self, message: discord.Message): + if message.author == self.client.user: + return # Ignore self messages + + is_direct = message.channel.type == discord.ChannelType.private + + is_mention = self.any_mention and self.client.user in message.mentions \ + or bool(re.match(f"^<@!?{self.client.user.id}>", message.content)) + + if self.remove_mentions: + message.content = re.sub(r"<@!?[^>]+>", "", message.content) + elif is_mention: + message.content = re.sub(f"^<@!?{self.client.user.id}>", "", message.content) + + command_args = parse_arguments(message.content) + + if len(command_args) == 0: + return # Empty message + + is_alias = self.alias is not None and command_args[0].startswith(self.alias) + if is_alias: # remove alias from first arg + command_args[0] = command_args[0][len(self.alias):] + + if not is_direct and not is_mention and not is_alias: + return # Not for the bot + + for command in self.__commands: + if re.match(command.regex, command_args[0].lower() if self.lower_command_names else command_args[0]): + if self.log_calls: + debug(message, str(command_args)) + + if not is_direct and self.enforce_write_permission: + # Check if bot can respond on current channel or DM user + permissions = message.channel.permissions_for(message.guild.me) + if not permissions.send_messages: + await message.author.create_dm() + await message.author.dm_channel.send( + f"Hi, this bot doesn\'t have the permission to send a message to" + f" #{message.channel} in server '{message.guild}'") + return + await command.compute(self.client, message, *command_args) + + async def on_guild_join(self, guild: discord.guild): + if self.guild_logs_file is not None: + with open(self.guild_logs_file, encoding="utf-8", mode="a") as f: + f.write(f"{datetime.now():%Y-%m-%d %H:%M} +{guild.id}: {guild.name}\n") + + async def on_guild_remove(self, guild: discord.guild): + if self.guild_logs_file is not None: + with open(self.guild_logs_file, encoding="utf-8", mode="a") as f: + f.write(f"{datetime.now():%Y-%m-%d %H:%M} -{guild.id}: {guild.name}\n") + pass + + def register_command(self, regex: str, compute: CommandFunction, help_short: str, help_long: str): + self.__commands += [Command(regex, compute, help_short, help_long)] + + def start(self): + logging.info(f"Current PID: {os.getpid()}") + env_file_found = load_dotenv() + self.__token = os.getenv(self.token_env_var) + if self.__token is None: + if env_file_found: + raise EnvironmentError(f"No token was loaded, please verify your .env file has '{self.token_env_var}'") + else: + raise EnvironmentError(f"No environment variable '{self.token_env_var}' found") + self.__t0 = datetime.now() + # Launch client and rerun on errors + while True: + try: + self.client.run(self.__token) + break # clean kill + except Exception as e: + t = datetime.now() + logging.error(f"Exception raised : {repr(e)}") + if repr(e) != self.__last_error: + self.__last_error = repr(e) + filename = f"error_{t:%Y-%m-%d_%H-%M-%S}.txt" + with open(filename, 'w') as f: + f.write(f"{self.app_name} v{self.version} started at {self.__t0:%Y-%m-%d %H:%M}\r\n" + f"Exception raised at {t:%Y-%m-%d %H:%M}\r\n" + f"\r\n" + f"{traceback.format_exc()}") + time.sleep(self.error_restart_delay) diff --git a/main.py b/main.py new file mode 100644 index 0000000..852955d --- /dev/null +++ b/main.py @@ -0,0 +1,8 @@ +import logging +from bot import Bot + +logging.basicConfig(format="[%(asctime)s][%(levelname)s][%(module)s] %(message)s", level=logging.INFO) + +bot = Bot("test-app", "0.1-alpha", alias="|") +bot.log_calls = True +bot.start() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c47d7a8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +discord +python-dotenv \ No newline at end of file