code into packages

This commit is contained in:
klemek
2021-01-08 12:57:23 +01:00
parent 038b1a294c
commit 5a965bbc7a
14 changed files with 12 additions and 11 deletions
View File
+79
View File
@@ -0,0 +1,79 @@
from typing import List, Optional
from datetime import datetime
from collections import defaultdict
import discord
# Custom libs
from .utils import mention, plural, day_interval
class Emote:
"""
Custom class to store emotes data
"""
def __init__(self, emoji: Optional[discord.Emoji] = None):
self.emoji = emoji
self.usages = 0
self.reactions = 0
self.last_used = None
self.members = defaultdict(int)
def update_use(self, date: datetime, members_id: List[int]):
"""
Update last use date if more recent and last member
"""
if self.last_used is None or date > self.last_used:
self.last_used = date
for member_id in members_id:
self.members[member_id] += 1
def used(self) -> bool:
return self.usages > 0 or self.reactions > 0
def score(self) -> float:
# Score is compose of usages + reactions
# When 2 emotes have the same score,
# the days since last use is stored in the digits
# (more recent first)
return self.usages + self.reactions + 1 / (100000 * (self.use_days() + 1))
def life_days(self) -> int:
return (datetime.today() - self.emoji.created_at).days
def use_days(self) -> int:
# If never used, use creation date instead
if self.last_used is None:
return self.life_days()
else:
return (datetime.today() - self.last_used).days
def get_top_member(self) -> int:
return sorted(self.members.keys(), key=lambda id: self.members[id])[-1]
def to_string(self, i: int, name: str, show_life: bool, show_members: bool) -> str:
# place
output = ""
if i == 0:
output += ":first_place:"
elif i == 1:
output += ":second_place:"
elif i == 2:
output += ":third_place:"
else:
output += f"**#{i + 1}**"
output += f" {name} - "
if not self.used():
output += "never used "
else:
output += f"{plural(self.usages, 'time')} "
if self.reactions >= 1:
output += f"and {plural(self.usages, 'reaction')} "
if show_life and not self.default:
output += f"(in {plural(self.life_days(), 'day')}) "
if self.used():
output += f"(last used {day_interval(self.use_days())})"
if show_members:
output += f" (mostly by {mention(self.get_top_member())}: {self.members[self.get_top_member()]})"
return output
File diff suppressed because one or more lines are too long
+87
View File
@@ -0,0 +1,87 @@
import re
import json
import logging
from os import path
EXTRA_EMOJI = {
"thumbup": "1f44d",
"thumbdown": "1f44e",
"timer": "23f2-fe0f",
"cowboy": "1f920",
"clown": "1f921",
"newspaper2": "1f5de-fe0f",
"french_bread": "1f956",
"nerd": "1f913",
"zipper_mouth": "1f910",
"salad": "1f957",
"rolling_eyes": "1f644",
"basketball_player": "26f9-fe0f-200d-2642-fe0f",
"thinking": "1f914",
"e_mail": "2709-fe0f",
"slight_frown": "1f641",
"skull_crossbones": "2620-fe0f",
"hand_splayed": "1f590-fe0f",
"speaking_head": "1f5e3-fe0f",
"cross": "271d-fe0f",
"crayon": "1f58d-fe0f",
"head_bandage": "1f915",
"rofl": "1f923",
"flag_white": "1f3f3-fe0f",
"slight_smile": "1f642",
"fork_knife_plate": "1f37d-fe0f",
"robot": "1f916",
"hugging": "1f917",
"biohazard": "2623-fe0f",
"notepad_spiral": "1f5d2-fe0f",
"lifter": "1f3cb-fe0f-200d-2642-fe0f",
"race_car": "1f3ce-fe0f",
"left_facing_fist": "1f91b",
"right_facing_fist": "1f91c",
"tools": "1f6e0-fe0f",
"umbrella2": "2602-fe0f",
"upside_down": "2b07-fe0f",
"first_place": "1f947",
"dagger": "1f5e1-fe0f",
"fox": "1f98a",
"menorah": "1f54e",
"desktop": "1f5a5-fe0f",
"motorcycle": "1f3cd-fe0f",
"levitate": "1f574-fe0f",
"cheese": "1f9c0",
"fingers_crossed": "1f91e",
"frowning2": "1f626",
"microphone2": "1f399-fe0f",
"flag_black": "1f3f4",
"chair": "1FA91",
"champagne_glass": "1F942",
"raised_hand": "270B",
"knife": "1F52A",
"postal_horn": "1F4EF",
"punch": "1F44A",
}
global_list = {}
unicode_list = []
regex = re.compile("(<a?:[\\w\\-\\~]+:\\d+>|:[\\w\\-\\~]+:)")
def load_emojis():
global global_list, unicode_list, regex
emoji_list = []
with open(path.join(path.dirname(__file__), "emoji.json"), mode="r") as f:
emoji_list = json.loads(f.readline().strip())
for emoji in EXTRA_EMOJI:
emoji_list += [{"short_name": emoji, "unified": EXTRA_EMOJI[emoji]}]
unicode_list_escaped = []
for emoji in emoji_list:
shortname = emoji["short_name"]
unified = emoji["unified"]
if unified is not None and shortname is not None:
unicode_escaped = "".join([f"\\U{c:0>8}" for c in unified.split("-")])
unicode = bytes(unicode_escaped, "ascii").decode("unicode-escape")
shortcode = shortname.replace("-", "_")
global_list[unicode] = f":{shortcode}:"
unicode_list += [unicode]
unicode_list_escaped += [unicode_escaped]
regex = re.compile(f"(<a?:\\w+:\\d+>|:\\w+:|{'|'.join(unicode_list_escaped)})")
logging.info(f"loaded {len(unicode_list)} emojis")
+271
View File
@@ -0,0 +1,271 @@
from typing import Union, List, Tuple
import os
import discord
import json
import gzip
from datetime import datetime
import logging
from .utils import code_message, is_extension
LOG_DIR = "logs"
if not os.path.exists(LOG_DIR):
os.mkdir(LOG_DIR)
CHUNK_SIZE = 1000
FORMAT = 3
IMAGE_FORMAT = ["gif", "gifv", "png", "jpg", "jpeg", "bmp"]
EMBED_IMAGES = ["image", "gifv"]
current_analysis = []
class FakeMessage:
def __init__(self, id: int):
self.id = id
class MessageLog:
def __init__(self, message: Union[discord.Message, dict]):
if isinstance(message, discord.Message):
self.id = message.id
self.created_at = message.created_at
self.edited_at = message.edited_at
self.author = message.author.id
self.pinned = message.pinned
self.mention_everyone = message.mention_everyone
self.tts = message.tts
self.reference = (
message.reference.message_id if message.reference is not None else None
)
self.bot = message.author.bot or message.author.system
self.content = message.content
self.mentions = message.raw_mentions
self.role_mentions = message.raw_role_mentions
self.channel_mentions = message.raw_channel_mentions
self.image = False
for attachment in message.attachments:
if is_extension(attachment.filename, IMAGE_FORMAT):
self.image = True
break
if not self.image:
for embed in message.embeds:
if embed.type in EMBED_IMAGES:
self.image = True
break
self.reactions = {}
elif isinstance(message, dict):
self.id = int(message["id"])
self.created_at = datetime.fromisoformat(message["created_at"])
self.edited_at = (
datetime.fromisoformat(message["edited_at"])
if message["edited_at"] is not None
else None
)
self.author = int(message["author"])
self.pinned = message["pinned"]
self.mention_everyone = message["mention_everyone"]
self.tts = message["tts"]
self.reference = (
int(message["reference"]) if message["reference"] is not None else None
)
self.bot = message["bot"]
self.content = message["content"]
self.mentions = [int(m) for m in message["mentions"]]
self.role_mentions = [int(m) for m in message["role_mentions"]]
self.channel_mentions = [int(m) for m in message["channel_mentions"]]
self.image = message["image"]
self.reactions = message["reactions"]
async def load(self, message: discord.Message):
for reaction in message.reactions:
self.reactions[str(reaction.emoji)] = []
async for user in reaction.users():
self.reactions[str(reaction.emoji)] += [user.id]
def dict(self) -> dict:
message = dict(self.__dict__)
message["created_at"] = self.created_at.isoformat()
message["edited_at"] = (
self.edited_at.isoformat() if self.edited_at is not None else None
)
return message
class ChannelLogs:
def __init__(self, channel: Union[discord.TextChannel, dict]):
if isinstance(channel, discord.TextChannel):
self.id = channel.id
self.name = channel.name
self.last_message_id = None
self.format = FORMAT
self.messages = []
elif isinstance(channel, dict):
self.format = channel["format"] if "format" in channel else None
if self.format != FORMAT:
return
self.id = int(channel["id"])
self.name = channel["name"]
self.last_message_id = int(channel["last_message_id"])
self.messages = [MessageLog(message) for message in channel["messages"]]
async def load(self, channel: discord.TextChannel) -> Tuple[int, int]:
self.name = channel.name
self.channel = channel
try:
if self.last_message_id is not None: # append
while self.last_message_id != channel.last_message_id:
async for message in channel.history(
limit=CHUNK_SIZE,
after=FakeMessage(self.last_message_id),
oldest_first=True,
):
self.last_message_id = message.id
m = MessageLog(message)
await m.load(message)
self.messages.insert(0, m)
yield len(self.messages), False
else: # first load
last_message_id = None
done = 0
while done >= CHUNK_SIZE or last_message_id is None:
done = 0
async for message in channel.history(
limit=CHUNK_SIZE,
before=FakeMessage(last_message_id)
if last_message_id is not None
else None,
oldest_first=False,
):
done += 1
last_message_id = message.id
m = MessageLog(message)
await m.load(message)
self.messages += [m]
yield len(self.messages), False
self.last_message_id = channel.last_message_id
except discord.errors.HTTPException:
return # When an exception occurs (like Forbidden)
yield len(self.messages), True
def dict(self) -> dict:
channel = dict(self.__dict__)
channel.pop("channel", None)
channel["messages"] = [message.dict() for message in self.messages]
return channel
class GuildLogs:
def __init__(self, guild: discord.Guild):
self.guild = guild
self.log_file = os.path.join(LOG_DIR, f"{guild.id}.logz")
self.channels = {}
def dict(self) -> dict:
return {id: self.channels[id].dict() for id in self.channels}
async def load(
self, progress: discord.Message, target_channels: List[discord.TextChannel] = []
) -> Tuple[int, int]:
global current_analysis
if self.log_file in current_analysis:
return -1, -1
current_analysis += [self.log_file]
# read logs
t0 = datetime.now()
if os.path.exists(self.log_file):
channels = {}
try:
gziped_data = None
await code_message(progress, "Reading saved history (1/4)...")
with open(self.log_file, mode="rb") as f:
gziped_data = f.read()
await code_message(progress, "Reading saved history (2/4)...")
json_data = gzip.decompress(gziped_data)
await code_message(progress, "Reading saved history (3/4)...")
channels = json.loads(json_data)
await code_message(progress, "Reading saved history (4/4)...")
self.channels = {int(id): ChannelLogs(channels[id]) for id in channels}
# remove invalid format
self.channels = {
id: self.channels[id]
for id in self.channels
if self.channels[id].format == FORMAT
}
dt = (datetime.now() - t0).total_seconds()
logging.info(f"log {self.guild.id} > loaded in {dt} s")
except json.decoder.JSONDecodeError:
logging.error(f"log {self.guild.id} > invalid JSON")
except IOError:
logging.error(f"log {self.guild.id} > cannot read")
# load channels
t0 = datetime.now()
if len(target_channels) == 0:
target_channels = self.guild.text_channels
loading_new = 0
total_msg = 0
queried_msg = 0
total_chan = 0
max_chan = len(target_channels)
await code_message(
progress,
f"Reading history...\n0 messages in 0/{max_chan} channels\n(this might take a while)",
)
for channel in target_channels:
if channel.id not in self.channels:
loading_new += 1
self.channels[channel.id] = ChannelLogs(channel)
start_msg = len(self.channels[channel.id].messages)
async for count, done in self.channels[channel.id].load(channel):
if count > 0:
tmp_queried_msg = queried_msg + count - start_msg
tmp_msg = total_msg + count
warning_msg = "(this might take a while)"
if len(target_channels) > 5 and loading_new > 5:
warning_msg = (
"(most channels are new, this might take a looong while)"
)
elif loading_new > 0:
warning_msg = (
"(some channels are new, this might take a long while)"
)
dt = (datetime.now() - t0).total_seconds()
await code_message(
progress,
f"Reading history...\n{tmp_msg:,} messages in {total_chan + 1}/{max_chan} channels ({round(tmp_queried_msg/dt)}m/s)\n{warning_msg}",
)
if done:
total_chan += 1
total_msg += len(self.channels[channel.id].messages)
queried_msg += count - start_msg
dt = (datetime.now() - t0).total_seconds()
logging.info(
f"log {self.guild.id} > queried in {dt} s -> {queried_msg / dt} m/s"
)
# write logs
t0 = datetime.now()
await code_message(
progress,
f"Saving (1/3)...\n{total_msg:,} messages in {total_chan} channels",
)
json_data = bytes(json.dumps(self.dict()), "utf-8")
await code_message(
progress,
f"Saving (2/3)...\n{total_msg:,} messages in {total_chan} channels",
)
gziped_data = gzip.compress(json_data)
await code_message(
progress,
f"Saving (3/3)...\n{total_msg:,} messages in {total_chan} channels",
)
with open(self.log_file, mode="wb") as f:
f.write(gziped_data)
dt = (datetime.now() - t0).total_seconds()
logging.info(f"log {self.guild.id} > written in {dt} s")
await code_message(
progress, f"Analysing...\n{total_msg:,} messages in {total_chan} channels"
)
current_analysis.remove(self.log_file)
return total_msg, total_chan
+122
View File
@@ -0,0 +1,122 @@
from typing import List, Dict
import os
import logging
import discord
# DISCORD API
def debug(message: discord.Message, txt: str):
logging.info(f"{message.guild} > #{message.channel}: {txt}")
async def code_message(message: discord.Message, content: str):
await message.edit(content=f"```\n{content}\n```")
def mention(member_id: int) -> str:
return f"<@{member_id}>"
# FILE
def is_extension(filepath: str, ext_list: List[str]) -> bool:
filename, file_extension = os.path.splitext(filepath.lower())
return file_extension in ext_list
# LISTS
def no_duplicate(seq: list) -> list:
"""
Remove any duplicates on a list
:param seq: original list
:type seq: list
:return: same list with no duplicates
:rtype: list
"""
return list(dict.fromkeys(seq))
# MESSAGE FORMATTING
def aggregate(names: List[str]) -> str:
"""
Aggregate names with , and &
Example : "a, b, c & d"
"""
if len(names) == 0:
return ""
elif len(names) == 1:
return names[0]
else:
return ", ".join(names[:-1]) + " & " + names[-1]
def plural(count: int, word: str) -> str:
return str(count) + " " + word + ("s" if count != 1 else "")
def day_interval(interval: int) -> str:
if interval == 0:
return "today"
elif interval == 1:
return "yesterday"
else:
return f"{interval} days ago"
# APP SPECIFIC
def get_intro(
subject: str,
full: bool,
channels: List[discord.TextChannel],
members: List[discord.Member],
nmm: int, # number of messages impacted
nc: int, # number of impacted channels
) -> str:
"""
Get the introduction sentence of the response
"""
# Show all data (members, channels) when it's less than 5 units
if len(members) == 0:
# Full scan of the server
if full:
return f"{subject} in this server ({nc} channels, {nmm:,} messages):"
elif len(channels) < 5:
return f"{aggregate([c.mention for c in channels])} {subject} in {nmm:,} messages:"
else:
return f"These {len(channels)} channels {subject} in {nmm:,} messages:"
elif len(members) < 5:
if full:
return f"{aggregate([m.mention for m in members])} {subject} in {nmm:,} messages:"
elif len(channels) < 5:
return (
f"{aggregate([m.mention for m in members])} on {aggregate([c.mention for c in channels])} "
f"{subject} in {nmm:,} messages:"
)
else:
return (
f"{aggregate([m.mention for m in members])} on these {len(channels)} channels "
f"{subject} in {nmm:,} messages:"
)
else:
if full:
return f"These {len(members)} members {subject} in {nmm:,} messages:"
elif len(channels) < 5:
return (
f"These {len(members)} members on {aggregate([c.mention for c in channels])} "
f"{subject} in {nmm:,} messages:"
)
else:
return (
f"These {len(members)} members on these {len(channels)} channels "
f"{subject} in {nmm:,} messages:"
)