From 90aa399bfb5cfd3f1b34fb7011a6ea5b5d35c659 Mon Sep 17 00:00:00 2001 From: klemek Date: Fri, 8 Jan 2021 12:16:18 +0100 Subject: [PATCH] ultimate factorisation --- bot.py | 6 +- data_types.py | 79 +++++++++++++ emotes.py | 303 ++++++++++++-------------------------------------- scan.py | 0 scanner.py | 131 ++++++++++++++++++++++ utils.py | 1 - 6 files changed, 282 insertions(+), 238 deletions(-) create mode 100644 data_types.py delete mode 100644 scan.py create mode 100644 scanner.py diff --git a/bot.py b/bot.py index 26856cf..a427025 100755 --- a/bot.py +++ b/bot.py @@ -2,7 +2,7 @@ from miniscord import Bot import logging import emojis -import emotes +from emotes import EmotesScanner logging.basicConfig( format="[%(asctime)s][%(levelname)s][%(module)s] %(message)s", level=logging.INFO @@ -19,8 +19,8 @@ bot.log_calls = True bot.client.bot = bot # TODO place in miniscord bot.register_command( "emotes", # command text (regex) - emotes.compute, # command function + lambda *args: EmotesScanner().compute(*args), # command function "emotes: Emotes analysis", # short help - emotes.HELP, + EmotesScanner.help(), ) bot.start() diff --git a/data_types.py b/data_types.py new file mode 100644 index 0000000..d2e0a46 --- /dev/null +++ b/data_types.py @@ -0,0 +1,79 @@ +from typing import List, Optional +from datetime import datetime +from collections import defaultdict +import discord + +# Custom libs + +from utils import mention, plural, day_interval + + +class Emote: + """ + Custom class to store emotes data + """ + + def __init__(self, emoji: Optional[discord.Emoji] = None): + self.emoji = emoji + self.usages = 0 + self.reactions = 0 + self.last_used = None + self.members = defaultdict(int) + + def update_use(self, date: datetime, members_id: List[int]): + """ + Update last use date if more recent and last member + """ + if self.last_used is None or date > self.last_used: + self.last_used = date + for member_id in members_id: + self.members[member_id] += 1 + + def used(self) -> bool: + return self.usages > 0 or self.reactions > 0 + + def score(self) -> float: + # Score is compose of usages + reactions + # When 2 emotes have the same score, + # the days since last use is stored in the digits + # (more recent first) + return self.usages + self.reactions + 1 / (100000 * (self.use_days() + 1)) + + def life_days(self) -> int: + return (datetime.today() - self.emoji.created_at).days + + def use_days(self) -> int: + # If never used, use creation date instead + if self.last_used is None: + return self.life_days() + else: + return (datetime.today() - self.last_used).days + + def get_top_member(self) -> int: + return sorted(self.members.keys(), key=lambda id: self.members[id])[-1] + + def to_string(self, i: int, name: str, show_life: bool, show_members: bool) -> str: + # place + output = "" + if i == 0: + output += ":first_place:" + elif i == 1: + output += ":second_place:" + elif i == 2: + output += ":third_place:" + else: + output += f"**#{i + 1}**" + output += f" {name} - " + if not self.used(): + output += "never used " + else: + output += f"{plural(self.usages, 'time')} " + if self.reactions >= 1: + output += f"and {plural(self.usages, 'reaction')} " + if show_life and not self.default: + output += f"(in {plural(self.life_days(), 'day')}) " + if self.used(): + output += f"(last used {day_interval(self.use_days())})" + if show_members: + output += f" (mostly by {mention(self.get_top_member())}: {self.members[self.get_top_member()]})" + return output diff --git a/emotes.py b/emotes.py index 93940b8..8bfbf70 100644 --- a/emotes.py +++ b/emotes.py @@ -1,203 +1,87 @@ -from typing import Dict, List, Optional -from datetime import datetime -from collections import defaultdict +from typing import Dict, List import discord # Custom libs -from utils import no_duplicate, mention, plural, day_interval, get_intro -from log_manager import GuildLogs, MessageLog +from log_manager import ChannelLogs, MessageLog +from data_types import Emote +from scanner import Scanner import emojis -# CONSTANTS -CHUNK_SIZE = 1000 - -# MAIN - -HELP = ( - "```\n" - + "%emotes : Rank emotes by their usage\n" - + "arguments:\n" - + "* @member : filter for one or more member\n" - + "* #channel : filter for one or more channel\n" - + "* : top emojis, default is 20\n" - + "* all : list all common emojis in addition to this guild's\n" - + "* members : show top member for each emote\n" - + "Example: %emotes 10 all #mychannel1 #mychannel2 @user\n" - + "```" -) +async def compute(self, *args): + EmotesScanner().compute(*args) -async def compute(client: discord.client, message: discord.Message, *args: str): - """ - Computes the %emotes command - """ - guild = message.guild - logs = GuildLogs(guild) +class EmotesScanner(Scanner): + @staticmethod + def help() -> str: + return "```\n" + +"%emotes : Rank emotes by their usage\n" + +"arguments:\n" + +"* @member : filter for one or more member\n" + +"* #channel : filter for one or more channel\n" + +"* : top emojis, default is 20\n" + +"* all : list all common emojis in addition to this guild's\n" + +"* members : show top member for each emote\n" + +"Example: %emotes 10 all #mychannel1 #mychannel2 @user\n" + +"```" - # If "%emotes help" redirect to "%help emotes" - if "help" in args: - await client.bot.help(client, message, "help", "emotes") - return + def __init__(self): + super().__init__( + has_digit_args=True, + valid_args=["all", "members"], + help=EmotesScanner.help(), + intro_context="emotes usage", + ) + self.top = 20 + self.all_emojis = False + self.show_members = False - # check args validity - str_channel_mentions = [channel.mention for channel in message.channel_mentions] - str_mentions = [member.mention for member in message.mentions] - for arg in args[1:]: - if ( - arg not in ["all", "members"] - and not arg.isdigit() - and arg not in str_channel_mentions - and arg not in str_mentions - ): - await message.channel.send(f"Unrecognized argument: `{arg}`") - return + async def compute_args(self, message: discord.Message, *args: str) -> bool: + # get max emotes to view + self.top = 20 + for arg in args: + if arg.isdigit(): + self.top = int(arg) + # check other args + self.all_emojis = "all" in args + self.show_members = "members" in args and ( + len(self.members) == 0 or len(self.members) > 1 + ) + return True - # Create emotes dict from custom emojis of the guild - emotes = defaultdict(Emote) - for emoji in guild.emojis: - emotes[str(emoji)] = Emote(emoji) + def compute_message(self, channel: ChannelLogs, message: MessageLog): + return analyse_message( + message, self.emotes, self.raw_members, all_emojis=self.all_emojis + ) - # Get selected channels or all of them if no channel arguments - channels = no_duplicate(message.channel_mentions) - full = len(channels) == 0 - if full: - channels = guild.text_channels - - # Get selected members - members = no_duplicate(message.mentions) - raw_members = no_duplicate(message.raw_mentions) - - # get max emotes to view - top = 20 - for arg in args: - if arg.isdigit(): - top = int(arg) - - # check other args - all_emojis = "all" in args - show_members = "members" in args and (len(members) == 0 or len(members) > 1) - - # Start computing data - async with message.channel.typing(): - progress = await message.channel.send("```Starting analysis...```") - total_msg, total_chan = await logs.load(progress, channels) - if total_msg == -1: - await message.channel.send( - f"{message.author.mention} An analysis is already running on this server, please be patient." + def get_results(self, intro: str) -> List[str]: + names = [name for name in self.emotes] + names.sort(key=lambda name: self.emotes[name].score(), reverse=True) + names = names[: self.top] + res = [intro] + allow_unused = self.full and len(self.members) == 0 + res += [ + self.emotes[name].to_string( + names.index(name), name, False, self.show_members ) - else: - msg_count = 0 - chan_count = 0 - for id in logs.channels: - count = sum( - [ - analyse_message( - messagelog, emotes, raw_members, all_emojis=all_emojis - ) - for messagelog in logs.channels[id].messages - ] - ) - msg_count += count - chan_count += 1 if count > 0 else 0 - await progress.edit(content="```Computing results...```") - # Display results - await tell_results( - get_intro( - "emotes usage", - emotes, - full, - channels, - members, - msg_count, - chan_count, - ), - emotes, - message.channel, - total_msg, - top=top, - allow_unused=full and len(members) == 0, - show_life=False, - show_members=show_members, - ) - # Delete custom progress message - await progress.delete() - - -# CLASSES - - -class Emote: - """ - Custom class to store emotes data - """ - - def __init__(self, emoji: Optional[discord.Emoji] = None): - self.emoji = emoji - self.usages = 0 - self.reactions = 0 - self.last_used = None - self.members = defaultdict(int) - - def update_use(self, date: datetime, members_id: List[int]): - """ - Update last use date if more recent and last member - """ - if self.last_used is None or date > self.last_used: - self.last_used = date - for member_id in members_id: - self.members[member_id] += 1 - - def used(self) -> bool: - return self.usages > 0 or self.reactions > 0 - - def score(self) -> float: - # Score is compose of usages + reactions - # When 2 emotes have the same score, - # the days since last use is stored in the digits - # (more recent first) - return self.usages + self.reactions + 1 / (100000 * (self.use_days() + 1)) - - def life_days(self) -> int: - return (datetime.today() - self.emoji.created_at).days - - def use_days(self) -> int: - # If never used, use creation date instead - if self.last_used is None: - return self.life_days() - else: - return (datetime.today() - self.last_used).days - - def get_top_member(self) -> int: - return sorted(self.members.keys(), key=lambda id: self.members[id])[-1] - - def to_string(self, i: int, name: str, show_life: bool, show_members: bool) -> str: - # place - output = "" - if i == 0: - output += ":first_place:" - elif i == 1: - output += ":second_place:" - elif i == 2: - output += ":third_place:" - else: - output += f"**#{i + 1}**" - output += f" {name} - " - if not self.used(): - output += "never used " - else: - output += f"{plural(self.usages, 'time')} " - if self.reactions >= 1: - output += f"and {plural(self.usages, 'reaction')} " - if show_life and not self.default: - output += f"(in {plural(self.life_days(), 'day')}) " - if self.used(): - output += f"(last used {day_interval(self.use_days())})" - if show_members: - output += f" (mostly by {mention(self.get_top_member())}: {self.members[self.get_top_member()]})" - return output + for name in names + if allow_unused or self.emotes[name].used() + ] + # Get the total of all emotes used + usage_count = 0 + reaction_count = 0 + for name in self.emotes: + usage_count += self.emotes[name].usages + reaction_count += self.emotes[name].reactions + res += [ + f"Total: {usage_count:,} times ({usage_count / self.msg_count:.4f} / message)" + ] + if reaction_count > 0: + res[-1] += f" and {reaction_count:,} reactions" + return res # ANALYSIS @@ -238,52 +122,3 @@ def analyse_message( emotes[name].reactions += 1 emotes[name].update_use(message.created_at, [member]) return impacted - - -# RESULTS - - -async def tell_results( - intro: str, # introduction sentence (from get_intro) - emotes: Dict[str, Emote], - channel: discord.TextChannel, - nmm: int, # number of impacted messages - top: int, # top n emojis - *, - allow_unused: bool, - show_life: bool, - show_members: bool, -): - names = [name for name in emotes] - names.sort(key=lambda name: emotes[name].score(), reverse=True) - names = names[:top] - res = [intro] - res += [ - emotes[name].to_string(names.index(name), name, show_life, show_members) - for name in names - if allow_unused or emotes[name].used() - ] - res += [get_total(emotes, nmm)] - response = "" - for r in res: - if len(response + "\n" + r) > 2000: - await channel.send(response) - response = "" - response += "\n" + r - if len(response) > 0: - await channel.send(response) - - -def get_total(emotes: Dict[str, Emote], nmm: int) -> str: - """ - Get the total of all emotes used - """ - nu = 0 - nr = 0 - for name in emotes: - nu += emotes[name].usages - nr += emotes[name].reactions - if nr > 0: - return f"Total: {nu:,} times ({nu / nmm:.4f} / message) and {nr:,} reactions" - else: - return f"Total: {nu:,} times ({nu / nmm:.4f} / message)" diff --git a/scan.py b/scan.py deleted file mode 100644 index e69de29..0000000 diff --git a/scanner.py b/scanner.py new file mode 100644 index 0000000..6045260 --- /dev/null +++ b/scanner.py @@ -0,0 +1,131 @@ +from abc import ABC, abstractmethod +from typing import List +from collections import defaultdict +import discord + +from utils import no_duplicate, get_intro +from log_manager import GuildLogs, ChannelLogs, MessageLog +from data_types import Emote + + +class Scanner(ABC): + def __init__( + self, + *, + has_digit_args: bool, + valid_args: List[str], + help: str, + intro_context: str, + ): + self.has_digit_args = has_digit_args + self.valid_args = valid_args + self.help = help + self.intro_context = intro_context + + self.members = [] + self.raw_members = [] + self.full = False + self.channels = [] + + self.msg_count = 0 + self.chan_count = 0 + + async def compute( + self, client: discord.client, message: discord.Message, *args: str + ): + guild = message.guild + logs = GuildLogs(guild) + + # If "%cmd help" redirect to "%help cmd" + if "help" in args: + await client.bot.help(client, message, "help", args[0]) + return + + # check args validity + str_channel_mentions = [channel.mention for channel in message.channel_mentions] + str_mentions = [member.mention for member in message.mentions] + for arg in args[1:]: + if ( + arg not in self.valid_args + and (not arg.isdigit() or not self.has_digit_args) + and arg not in str_channel_mentions + and arg not in str_mentions + ): + await message.channel.send( + f"{message.author.mention} unrecognized argument: `{arg}`" + ) + return + + # Create emotes dict from custom emojis of the guild + self.emotes = defaultdict(Emote) + for emoji in guild.emojis: + self.emotes[str(emoji)] = Emote(emoji) + + # Get selected channels or all of them if no channel arguments + self.channels = no_duplicate(message.channel_mentions) + self.full = len(self.channels) == 0 + if self.full: + self.channels = guild.text_channels + + # Get selected members + self.members = no_duplicate(message.mentions) + self.raw_members = no_duplicate(message.raw_mentions) + + if not await self.compute_args(message, *args): + return + + # Start computing data + async with message.channel.typing(): + progress = await message.channel.send("```Starting analysis...```") + total_msg, total_chan = await logs.load(progress, self.channels) + if total_msg == -1: + await message.channel.send( + f"{message.author.mention} An analysis is already running on this server, please be patient." + ) + else: + self.msg_count = 0 + self.chan_count = 0 + for channel in self.channels: + channel_logs = logs.channels[channel.id] + count = sum( + [ + self.compute_message(channel_logs, message_log) + for message_log in channel_logs.messages + ] + ) + self.msg_count += count + self.chan_count += 1 if count > 0 else 0 + await progress.edit(content="```Computing results...```") + # Display results + results = self.get_results( + get_intro( + self.intro_context, + self.full, + self.channels, + self.members, + self.msg_count, + self.chan_count, + ) + ) + response = "" + for r in results: + if len(response + "\n" + r) > 2000: + await message.channel.send(response) + response = "" + response += "\n" + r + if len(response) > 0: + await message.channel.send(response) + # Delete custom progress message + await progress.delete() + + @abstractmethod + async def compute_args(self, message: discord.Message, *args: str) -> bool: + pass + + @abstractmethod + def compute_message(self, channel: ChannelLogs, message: MessageLog) -> bool: + pass + + @abstractmethod + def get_results(self, intro: str): + pass diff --git a/utils.py b/utils.py index db2ac6d..3c19706 100644 --- a/utils.py +++ b/utils.py @@ -76,7 +76,6 @@ def day_interval(interval: int) -> str: def get_intro( subject: str, - emotes: Dict[str, object], full: bool, channels: List[discord.TextChannel], members: List[discord.Member],