12 Commits

Author SHA1 Message Date
klemek 77be9bc356 zipping logs 2021-01-06 19:05:18 +01:00
klemek 6b5d0dd1fe working logs query 2021-01-06 18:58:19 +01:00
klemek 014a792ad4 log management to improve speed 2021-01-06 16:30:53 +01:00
klemek 5b5e3f178b trying to optimize things (found it, it's coming) 2021-01-06 14:36:51 +01:00
klemek 2beeb1183f all emojis listing 2021-01-06 14:14:57 +01:00
klemek deebe7855c typing + transition to miniscord 2021-01-06 12:09:28 +01:00
klemek 9f7ffff43c transition to miniscord 2021-01-06 12:09:10 +01:00
Klemek 84682c015e Merge pull request #3 from Klemek/dev
release v1.3
2019-11-27 11:15:09 +01:00
Klemek 798e9a89ba Update bot.py 2019-11-27 11:14:20 +01:00
Klemek 153566cf4c Removed rerun of client
Error was in discord lib, handled with requirements
2019-11-27 11:13:54 +01:00
Klemek 63a200aed2 Update README.md 2019-11-27 11:12:39 +01:00
Klemek 18032a4683 Updated requirements.txt 2019-11-27 11:11:44 +01:00
9 changed files with 450 additions and 355 deletions
Regular → Executable
+3 -1
View File
@@ -2,4 +2,6 @@
venv venv
__pycache__ __pycache__
.env .env
error_* error_*
*.log
logs/
+1
View File
@@ -46,6 +46,7 @@ python3 bot.py
## Changelog ## Changelog
* **v1.3**: revert to v1.1 and update requirements
* **v1.2**: don't quit on occasional exception * **v1.2**: don't quit on occasional exception
* **v1.1**: * **v1.1**:
* coma separator for big numbers * coma separator for big numbers
Regular → Executable
+20 -104
View File
@@ -1,109 +1,25 @@
import os from miniscord import Bot
import traceback import logging
import discord
from datetime import datetime
from dotenv import load_dotenv
# Custom libs
import emotes import emotes
import help
from utils import debug
VERSION = "1.2" logging.basicConfig(
t0 = datetime.now() format="[%(asctime)s][%(levelname)s][%(module)s] %(message)s", level=logging.INFO
)
# Loading token emotes.load_emojis()
load_dotenv()
token = os.getenv('DISCORD_TOKEN')
client = discord.Client() bot = Bot(
"Discord Analyst", # name
"1.4", # version
async def info(message, args): alias="%", # respond to '|command' messages
""" )
Computes the %info command bot.log_calls = True
bot.client.bot = bot # TODO place in miniscord
:param message: message sent bot.register_command(
:type message: :class:`discord.Message` "emotes", # command text (regex)
:param args: arguments of the command emotes.compute, # command function
:type args: list[:class:`str`] "emotes: Emotes analysis", # short help
""" emotes.HELP,
await message.channel.send(f"```Discord Analyst v{VERSION} started at {t0:%Y-%m-%d %H:%M}```") )
bot.start()
COMMANDS = {
"%help": help.compute,
"%emotes": emotes.compute,
"%info": info
}
@client.event
async def on_ready():
"""
Called when client is connected
"""
# Change status
await client.change_presence(
activity=discord.Game(f"v{VERSION} | %help"),
status=discord.Status.online
)
# Debug connected guilds
print(f'{client.user} v{VERSION} has connected to Discord\nto the following guilds:')
for guild in client.guilds:
print(f'- {guild.name}(id: {guild.id})')
@client.event
async def on_message(message):
"""
Called when a message is sent to any channel on any guild
:param message: message sent
:type message: discord.Message
"""
# Ignore self messages
if message.author == client.user:
return
args = message.content.split(" ")
if len(args) < 1 or args[0] not in COMMANDS:
return
debug(message, f"command '{message.content}'")
# Check if bot can respond on current channel or DM user
permissions = message.channel.permissions_for(message.guild.me)
if not permissions.send_messages:
debug(message, f"missing 'send_messages' permission")
await message.author.create_dm()
await message.author.dm_channel.send(
f"Hi, this bot doesn\'t have the permission to send a message to"
f" #{message.channel} in server '{message.guild}'")
return
# Redirect to the correct command
await COMMANDS[args[0]](message, args)
print(f"Current PID: {os.getpid()}")
# Launch client and rerun on errors
while True:
try:
client.run(token)
break # clean kill
except Exception as e:
t = datetime.now()
print(f"Exception raised at {t:%Y-%m-%d %H:%M} : {repr(e)}")
fileName = f"error_{t:%Y-%m-%d_%H-%M-%S}.txt"
if os.path.exists(fileName):
print("Two many errors, killing")
break
with open(fileName, 'w') as f:
f.write(f"Discord Analyst v{VERSION} started at {t0:%Y-%m-%d %H:%M}\r\n"
f"Exception raised at {t:%Y-%m-%d %H:%M}\r\n"
f"\r\n"
f"{traceback.format_exc()}")
+1
View File
File diff suppressed because one or more lines are too long
+206 -200
View File
@@ -1,37 +1,128 @@
from typing import Dict, List, Tuple, Optional
from datetime import datetime from datetime import datetime
from collections import defaultdict
import discord import discord
import re import re
import json
import logging
# Custom libs # Custom libs
import help
from utils import debug, aggregate, no_duplicate from utils import debug, aggregate, no_duplicate
from log_manager import GuildLogs, ChannelLogs, MessageLog
# CONSTANTS # CONSTANTS
CHUNK_SIZE = 10000 CHUNK_SIZE = 1000
# preload
EXTRA_EMOJI = {
"thumbup": "1f44d",
"thumbdown": "1f44e",
"timer": "23f2-fe0f",
"cowboy": "1f920",
"clown": "1f921",
"newspaper2": "1f5de-fe0f",
"french_bread": "1f956",
"nerd": "1f913",
"zipper_mouth": "1f910",
"salad": "1f957",
"rolling_eyes": "1f644",
"basketball_player": "26f9-fe0f-200d-2642-fe0f",
"thinking": "1f914",
"e_mail": "2709-fe0f",
"slight_frown": "1f641",
"skull_crossbones": "2620-fe0f",
"hand_splayed": "1f590-fe0f",
"speaking_head": "1f5e3-fe0f",
"cross": "271d-fe0f",
"crayon": "1f58d-fe0f",
"head_bandage": "1f915",
"rofl": "1f923",
"flag_white": "1f3f3-fe0f",
"slight_smile": "1f642",
"fork_knife_plate": "1f37d-fe0f",
"robot": "1f916",
"hugging": "1f917",
"biohazard": "2623-fe0f",
"notepad_spiral": "1f5d2-fe0f",
"lifter": "1f3cb-fe0f-200d-2642-fe0f",
"race_car": "1f3ce-fe0f",
"left_facing_fist": "1f91b",
"right_facing_fist": "1f91c",
"tools": "1f6e0-fe0f",
"umbrella2": "2602-fe0f",
"upside_down": "2b07-fe0f",
"first_place": "1f947",
"dagger": "1f5e1-fe0f",
"fox": "1f98a",
"menorah": "1f54e",
"desktop": "1f5a5-fe0f",
"motorcycle": "1f3cd-fe0f",
"levitate": "1f574-fe0f",
"cheese": "1f9c0",
"fingers_crossed": "1f91e",
"frowning2": "1f626",
"microphone2": "1f399-fe0f",
"flag_black": "1f3f4",
"chair": "1FA91",
}
GLOBAL_EMOJIS = {}
EMOJI_REGEX = re.compile("(<a?:\\w+:\\d+>|:\\w+:)")
def load_emojis():
global GLOBAL_EMOJIS, INV_GLOBAL_EMOJIS, EMOJI_REGEX
emoji_list = []
with open("emoji.json", mode="r") as f:
emoji_list = json.loads(f.readline().strip())
for emoji in EXTRA_EMOJI:
emoji_list += [{"short_name": emoji, "unified": EXTRA_EMOJI[emoji]}]
unicode_list = []
for emoji in emoji_list:
shortcode = emoji["short_name"]
unified = emoji["unified"]
if unified is not None and shortcode is not None:
unicode_escaped = "".join([f"\\U{c:0>8}" for c in unified.split("-")])
unicode = bytes(unicode_escaped, "ascii").decode("unicode-escape")
shortcode = f":{shortcode.replace('-','_')}:"
GLOBAL_EMOJIS[unicode] = shortcode
unicode_list += [unicode_escaped]
EMOJI_REGEX = re.compile(f"(<a?:\\w+:\\d+>|:\\w+:|{'|'.join(unicode_list)})")
logging.info(f"loaded {len(GLOBAL_EMOJIS)} emojis")
# MAIN # MAIN
async def compute(message, args): HELP = (
"```\n"
+ "%emotes : Rank emotes by their usage\n"
+ "arguments:\n"
+ "* @member : filter for one or more member\n"
+ "* #channel : filter for one or more channel\n"
+ "* reactions : add reaction analysis for members (long)\n"
+ "* all : list all common emojis in addition to this guild's\n"
+ "```"
)
async def compute(client: discord.client, message: discord.Message, *args: str):
""" """
Computes the %emotes command Computes the %emotes command
:param message: message sent
:type message: :class:`discord.Message`
:param args: arguments of the command
:type args: list[:class:`str`]
""" """
guild = message.guild guild = message.guild
logs = GuildLogs(guild)
# If "%emotes help" redirect to "%help emotes" # If "%emotes help" redirect to "%help emotes"
if len(args) > 1 and args[1] == "help": if "help" in args:
await help.compute(message, ["%help", "emotes"]) await client.bot.help(client, message, "help", "emotes")
return return
# Create emotes dict from custom emojis of the guild # Create emotes dict from custom emojis of the guild
emotes = {str(emoji): Emote(emoji) for emoji in guild.emojis} emotes = defaultdict(Emote)
for emoji in guild.emojis:
emotes[str(emoji)] = Emote(emoji)
# Get selected channels or all of them if no channel arguments # Get selected channels or all of them if no channel arguments
channels = no_duplicate(message.channel_mentions) channels = no_duplicate(message.channel_mentions)
@@ -41,95 +132,64 @@ async def compute(message, args):
# Get selected members # Get selected members
members = no_duplicate(message.mentions) members = no_duplicate(message.mentions)
raw_members = no_duplicate(message.raw_mentions)
# Start computing data # Start computing data
async with message.channel.typing(): async with message.channel.typing():
nm = 0 # number of messages treated progress = await message.channel.send("```Starting analysis...```")
nmm = 0 # number of impacted messages total_msg, total_chan = await logs.load(progress, channels)
nc = 0 # number of channel treated for id in logs.channels:
t0 = datetime.now() analyse_channel(
# Show custom progress message and keep it to update it later logs.channels[id], emotes, raw_members, all_emojis="all" in args
progress = await message.channel.send(f"```starting analysis...```") )
# Analyse every channel selected
for channel in channels:
nm1, nmm1 = await analyse_channel(channel, emotes, members, progress, nm, nc)
# If treatment was successful, increase numbers
if nm1 >= 0:
nm += nm1
nmm += nmm1
nc += 1
# Delete custom progress message # Delete custom progress message
await progress.delete() await progress.delete()
# Display results # Display results
await tell_results(get_intro(emotes, full, channels, members, nmm, nc), await tell_results(
emotes, message.channel, nmm, allow_unused=full and len(members) == 0, show_life=False) get_intro(emotes, full, channels, members, total_msg, total_chan),
dt = (datetime.now() - t0).total_seconds() emotes,
debug(message, f"response sent {dt} s -> {nm / dt} m/s") message.channel,
total_msg,
allow_unused=full and len(members) == 0,
show_life=False,
)
# CLASSES # CLASSES
class Emote: class Emote:
""" """
Custom class to store emotes data Custom class to store emotes data
:param emoji: source discord emoji
:type emoji: :class:`discord.Emoji`
:ivar emoji: discord emoji
:vartype emoji: discord.Emoji
:ivar usages: number of usages in messages
:vartype usages: int
:ivar reactions: number of reactions below messages (always 0 if member specified)
:vartype reactions: int
:ivar last_used: date of last use
:vartype last_used: datetime
""" """
def __init__(self, emoji): def __init__(self, emoji: Optional[discord.Emoji] = None):
self.emoji = emoji self.emoji = emoji
self.usages = 0 self.usages = 0
self.reactions = 0 self.reactions = 0
self.last_used = None self.last_used = None
def update_use(self, date): def update_use(self, date: datetime):
""" """
Update last use date if more recent Update last use date if more recent
:param date: date of use
:type date: datetime
""" """
if self.last_used is None or date > self.last_used: if self.last_used is None or date > self.last_used:
self.last_used = date self.last_used = date
def used(self): def used(self) -> bool:
"""
:return: If this emote was used ever
:rtype: bool
"""
return self.usages > 0 or self.reactions > 0 return self.usages > 0 or self.reactions > 0
def score(self): def score(self) -> float:
"""
:return: Score of this emote to be sorted
:rtype: float
"""
# Score is compose of usages + reactions # Score is compose of usages + reactions
# When 2 emotes have the same score, the days since last use is stored in the digits # When 2 emotes have the same score,
# the days since last use is stored in the digits
# (more recent first) # (more recent first)
return self.usages + self.reactions + 1 / (100000 * (self.use_days() + 1)) return self.usages + self.reactions + 1 / (100000 * (self.use_days() + 1))
def life_days(self): def life_days(self) -> int:
"""
:return: Days since creation
:rtype: int
"""
return (datetime.today() - self.emoji.created_at).days return (datetime.today() - self.emoji.created_at).days
def use_days(self): def use_days(self) -> int:
"""
:return: Days since last use
:rtype: int
"""
# If never used, use creation date instead # If never used, use creation date instead
if self.last_used is None: if self.last_used is None:
return self.life_days() return self.life_days()
@@ -139,84 +199,57 @@ class Emote:
# ANALYSIS # ANALYSIS
async def analyse_channel(channel, emotes, members, progress, nm0, nc):
"""
Analyses a given channel
:param channel: channel to analyse def analyse_channel(
:type channel: discord.TextChannel channel: ChannelLogs,
:param emotes: known emotes emotes: Dict[str, Emote],
:type emotes: dict[str, Emote] raw_members: List[int],
:param members: selected members or empty for all *,
:type members: list[discord.Member] all_emojis: bool,
:param progress: custom progress message (already sent) ):
:type progress: discord.Message for message in channel.messages:
:param nm0: number of already analysed messages # If author included in the selection (empty list is all)
:type nm0: int if len(raw_members) == 0 or message.author in raw_members:
:param nc: number of already analysed channels # Find all emotes un the current message in the form "<:emoji:123456789>"
:type nc: int # Filter for known emotes
:return: nm, nc (-1 on exception) found = EMOJI_REGEX.findall(message.content)
:rtype: int, int # For each emote, update its usage
""" for name in found:
nm = 0 if name not in emotes:
nmm = 0 if not all_emojis or name not in GLOBAL_EMOJIS:
try: continue
messages = [None] name = GLOBAL_EMOJIS[name]
while len(messages) >= CHUNK_SIZE or messages[-1] is None: emotes[name].usages += 1
messages = await channel.history(limit=CHUNK_SIZE, before=messages[-1]).flatten() emotes[name].update_use(message.created_at)
for m in messages: # For each reaction of this message, test if known emote and update when it's the case
# If author is not bot or included in the selection (empty list is all) for name in message.reactions:
if not m.author.bot and (len(members) == 0 or m.author in members): raw_name = name
# Find all emotes un the current message in the form "<:emoji:123456789>" if name not in emotes:
# Filter for known emotes if not all_emojis or name not in GLOBAL_EMOJIS:
found = [name for name in re.findall(r"(<:\w+:\d+>)", m.content) if name in emotes] continue
# For each emote, update its usage name = GLOBAL_EMOJIS[name]
for name in found: if len(raw_members) == 0:
emotes[name].usages += 1 emotes[name].reactions += len(message.reactions[raw_name])
emotes[name].update_use(m.created_at) emotes[name].update_use(message.created_at)
# Count this message as impacted else:
nmm += 1 for member in raw_members:
# For each reaction of this message, test if known emote and update when it's the case if member in message.reactions[raw_name]:
for reaction in m.reactions: emotes[name].reactions += 1
name = str(reaction.emoji) emotes[name].update_use(message.created_at)
# reaction.emoji can be only str, we don't want that
if not (isinstance(reaction.emoji, str)) and name in emotes:
if len(members) == 0:
emotes[name].reactions += reaction.count
emotes[name].update_use(m.created_at)
""" else:
users = await reaction.users().flatten()
for member in members:
if member in users:
emotes[name].reactions += 1
emotes[name].update_use(m.created_at)"""
nm += len(messages)
await progress.edit(content=f"```{nm0 + nm:,} messages and {nc} channels analysed```")
return nm, nmm
except discord.errors.HTTPException:
# When an exception occurs (like Forbidden) sent -1
return -1, -1
# RESULTS # RESULTS
async def tell_results(intro, emotes, channel, nmm, *, allow_unused, show_life):
"""
Send the full results message
:param intro: introduction sentence (from get_intro) async def tell_results(
:type intro: str intro: str, # introduction sentence (from get_intro)
:param emotes: known emotes emotes: Dict[str, Emote],
:type emotes: dict[str, Emote] channel: discord.TextChannel,
:param channel: where to send the message (original message channel) nmm: int, # number of impacted messages
:type channel: discord.TextChannel *,
:param nmm: number of impacted messages allow_unused: bool,
:type nmm: int show_life: bool,
:param allow_unused: show unused emotes ):
:type allow_unused: bool
:param show_life: show emotes life span
:type show_life: bool
"""
names = [name for name in emotes] names = [name for name in emotes]
names.sort(key=lambda name: emotes[name].score(), reverse=True) names.sort(key=lambda name: emotes[name].score(), reverse=True)
res = [intro] res = [intro]
@@ -226,7 +259,9 @@ async def tell_results(intro, emotes, channel, nmm, *, allow_unused, show_life):
f"{get_reactions(emotes[name])}" f"{get_reactions(emotes[name])}"
f"{get_life(emotes[name], show_life)}" f"{get_life(emotes[name], show_life)}"
f"{get_last_used(emotes[name])}" f"{get_last_used(emotes[name])}"
for name in names if allow_unused or emotes[name].used()] for name in names
if allow_unused or emotes[name].used()
]
res += [get_total(emotes, nmm)] res += [get_total(emotes, nmm)]
response = "" response = ""
for r in res: for r in res:
@@ -238,24 +273,16 @@ async def tell_results(intro, emotes, channel, nmm, *, allow_unused, show_life):
await channel.send(response) await channel.send(response)
def get_intro(emotes, full, channels, members, nmm, nc): def get_intro(
emotes: Dict[str, Emote],
full: bool,
channels: List[discord.TextChannel],
members: List[discord.Member],
nmm: int, # number of messages impacted
nc: int, # number of channels analysed
) -> str:
""" """
Get the introduction sentence of the response Get the introduction sentence of the response
:param emotes: known emotes
:type emotes: dict[str, Emote]
:param full: if the scan contained all channels
:type full: bool
:param channels: channels selected (ignored if full is True)
:type channels: list[discord.TextChannel]
:param members: members selected (empty for all)
:type members: list[discord.Member]
:param nmm: number of messages impacted
:type nmm: int
:param nc: number of channels analysed
:type nc: int
:return: the correct intro sentence
:rtype: str
""" """
# Show all data (members, channels) when it's less than 5 units # Show all data (members, channels) when it's less than 5 units
if len(members) == 0: if len(members) == 0:
@@ -270,30 +297,33 @@ def get_intro(emotes, full, channels, members, nmm, nc):
if full: if full:
return f"{aggregate([m.mention for m in members])} emotes usage in {nmm:,} messages:" return f"{aggregate([m.mention for m in members])} emotes usage in {nmm:,} messages:"
elif len(channels) < 5: elif len(channels) < 5:
return f"{aggregate([m.mention for m in members])} on {aggregate([c.mention for c in channels])} " \ return (
f"emotes usage in {nmm:,} messages:" f"{aggregate([m.mention for m in members])} on {aggregate([c.mention for c in channels])} "
f"emotes usage in {nmm:,} messages:"
)
else: else:
return f"{aggregate([m.mention for m in members])} on these {len(channels)} channels " \ return (
f"emotes usage in {nmm:,} messages:" f"{aggregate([m.mention for m in members])} on these {len(channels)} channels "
f"emotes usage in {nmm:,} messages:"
)
else: else:
if full: if full:
return f"These {len(members)} members emotes usage in {nmm:,} messages:" return f"These {len(members)} members emotes usage in {nmm:,} messages:"
elif len(channels) < 5: elif len(channels) < 5:
return f"These {len(members)} members on {aggregate([c.mention for c in channels])} " \ return (
f"emotes usage in {nmm:,} messages:" f"These {len(members)} members on {aggregate([c.mention for c in channels])} "
f"emotes usage in {nmm:,} messages:"
)
else: else:
return f"These {len(members)} members on these {len(channels)} channels " \ return (
f"emotes usage in {nmm:,} messages:" f"These {len(members)} members on these {len(channels)} channels "
f"emotes usage in {nmm:,} messages:"
)
def get_place(i): def get_place(i: int) -> str:
""" """
Get the correct rank displayed (1st to 3rd have an emoji) Get the correct rank displayed (1st to 3rd have an emoji)
:param i: index
:type i: int
:return: rank string
:rtype: str
""" """
if i == 0: if i == 0:
return ":first_place:" return ":first_place:"
@@ -304,13 +334,9 @@ def get_place(i):
return f"**#{i + 1}**" return f"**#{i + 1}**"
def get_usage(emote): def get_usage(emote: Emote) -> str:
""" """
Get the correct usage displayed Get the correct usage displayed
:type emote: Emote
:return: usage description
:rtype: str
""" """
if emote.usages == 0 and emote.reactions == 0: if emote.usages == 0 and emote.reactions == 0:
return "never used " return "never used "
@@ -320,12 +346,9 @@ def get_usage(emote):
return f"{emote.usages:,} times " return f"{emote.usages:,} times "
def get_reactions(emote): def get_reactions(emote: Emote) -> str:
""" """
Get the correct reactions displayed Get the correct reactions displayed
:return: reactions description
:rtype: str
""" """
if emote.reactions == 0: if emote.reactions == 0:
return "" return ""
@@ -335,29 +358,19 @@ def get_reactions(emote):
return f"and {emote.reactions:,} reactions " return f"and {emote.reactions:,} reactions "
def get_life(emote, show_life): def get_life(emote: Emote, show_life: bool) -> str:
""" """
Get the correct life span displayed Get the correct life span displayed
:type emote: Emote
:param show_life: disable if False
:type show_life: bool
:return: life description
:rtype: str
""" """
if not show_life: if not show_life or emote.default:
return "" return ""
else: else:
return f"(in {emote.life_days()} days) " return f"(in {emote.life_days()} days) "
def get_last_used(emote): def get_last_used(emote: Emote) -> str:
""" """
Get the correct "last used" displayed Get the correct "last used" displayed
:type emote: Emote
:return: last usage description
:rtype: str
""" """
if emote.usages == 0 and emote.reactions == 0: if emote.usages == 0 and emote.reactions == 0:
return "" return ""
@@ -369,16 +382,9 @@ def get_last_used(emote):
return f"(last used {emote.use_days()} days ago)" return f"(last used {emote.use_days()} days ago)"
def get_total(emotes, nmm): def get_total(emotes: Dict[str, Emote], nmm: int) -> str:
""" """
Get the total of all emotes used Get the total of all emotes used
:param emotes: known emotes
:type emotes: dict[str, Emote]
:param nmm: number of messages impacted
:type nmm: int
:return: total sentence
:rtype: str
""" """
nu = 0 nu = 0
nr = 0 nr = 0
-31
View File
@@ -1,31 +0,0 @@
async def compute(message, args):
"""
Computes the %help command
:param message: message sent
:type message: discord.Message
:param args: arguments of the command
:type args: list[str]
"""
# Select correct response to send
response = "Discord Analyst commands:\n" \
"```\n" \
"%help (command) : Info on commands\n" \
"%info : This bot info\n" \
"%emotes : Emotes analysis\n" \
"```"
if len(args) > 1 and args[1] == "emotes":
response = "Emotes Analysis:\n" \
"```\n" \
"%emotes : Rank emotes by their usage\n" \
"%emotes @user : // for a specific user\n" \
"%emotes #channel : // for a specific channel\n" \
"(Add more @user or #channel to be more selective)\n" \
"```"
await message.channel.send(response)
+206
View File
@@ -0,0 +1,206 @@
from typing import Union, List, Tuple
import os
import discord
import json
import gzip
from datetime import datetime
import logging
LOG_DIR = "logs"
if not os.path.exists(LOG_DIR):
os.mkdir(LOG_DIR)
CHUNK_SIZE = 1000
class FakeMessage:
def __init__(self, id: int):
self.id = id
class MessageLog:
def __init__(self, message: Union[discord.Message, dict]):
if isinstance(message, discord.Message):
self.id = message.id
self.created_at = message.created_at
self.edited_at = message.edited_at
self.author = message.author.id
self.pinned = message.pinned
self.mention_everyone = message.mention_everyone
self.tts = message.tts
self.reference = (
message.reference.id if message.reference is not None else None
)
self.content = message.content
self.mentions = message.raw_mentions
self.role_mentions = message.raw_role_mentions
self.channel_mentions = message.raw_channel_mentions
self.reactions = {}
elif isinstance(message, dict):
self.id = int(message["id"])
self.created_at = datetime.fromisoformat(message["created_at"])
self.edited_at = (
datetime.fromisoformat(message["edited_at"])
if message["edited_at"] is not None
else None
)
self.author = message["author"]
self.pinned = message["pinned"]
self.mention_everyone = message["mention_everyone"]
self.tts = message["tts"]
self.reference = message["reference"]
self.content = message["content"]
self.mentions = message["mentions"]
self.role_mentions = message["role_mentions"]
self.channel_mentions = message["channel_mentions"]
self.reactions = message["reactions"]
async def load(self, message: discord.Message):
for reaction in message.reactions:
self.reactions[str(reaction.emoji)] = []
async for user in reaction.users():
self.reactions[str(reaction.emoji)] += [user.id]
def dict(self) -> dict:
message = dict(self.__dict__)
message["created_at"] = self.created_at.isoformat()
message["edited_at"] = (
self.edited_at.isoformat() if self.edited_at is not None else None
)
return message
class ChannelLogs:
def __init__(self, channel: Union[discord.TextChannel, dict]):
if isinstance(channel, discord.TextChannel):
self.id = channel.id
self.name = channel.name
self.last_message_id = None
self.messages = []
elif isinstance(channel, dict):
self.id = int(channel["id"])
self.name = channel["name"]
self.last_message_id = channel["last_message_id"]
self.messages = [MessageLog(message) for message in channel["messages"]]
async def load(self, channel: discord.TextChannel) -> Tuple[int, int]:
self.name = channel.name
self.channel = channel
try:
if self.last_message_id is not None: # append
while self.last_message_id != channel.last_message_id:
async for message in channel.history(
limit=CHUNK_SIZE,
after=FakeMessage(self.last_message_id),
oldest_first=True,
):
self.last_message_id = message.id
if not message.author.bot:
m = MessageLog(message)
await m.load(message)
self.messages.insert(0, m)
yield len(self.messages), False
else: # first load
last_message_id = None
done = 0
while done >= CHUNK_SIZE or last_message_id is None:
done = 0
async for message in channel.history(
limit=CHUNK_SIZE,
before=FakeMessage(last_message_id)
if last_message_id is not None
else None,
oldest_first=False,
):
done += 1
last_message_id = message.id
if not message.author.bot:
m = MessageLog(message)
await m.load(message)
self.messages += [m]
yield len(self.messages), False
self.last_message_id = channel.last_message_id
except discord.errors.HTTPException:
return # When an exception occurs (like Forbidden)
yield len(self.messages), True
def dict(self) -> dict:
channel = dict(self.__dict__)
channel.pop("channel", None)
channel["messages"] = [message.dict() for message in self.messages]
return channel
class GuildLogs:
def __init__(self, guild: discord.Guild):
self.guild = guild
self.log_file = os.path.join(LOG_DIR, f"{guild.id}.logz")
self.channels = {}
def dict(self) -> dict:
return {id: self.channels[id].dict() for id in self.channels}
async def load(
self, progress: discord.Message, target_channels: List[discord.TextChannel] = []
):
await progress.edit(
content=f"```Reading history...\n(this might take a while)```"
)
# read logs
t0 = datetime.now()
if os.path.exists(self.log_file):
channels = {}
try:
with open(self.log_file, mode="rb") as f:
channels = json.loads(gzip.decompress(f.read()))
self.channels = {int(id): ChannelLogs(channels[id]) for id in channels}
dt = (datetime.now() - t0).total_seconds()
logging.info(f"log {self.guild.id} > loaded in {dt} s")
except json.decoder.JSONDecodeError:
logging.error(f"log {self.guild.id} > invalid JSON")
except IOError:
logging.error(f"log {self.guild.id} > cannot read")
# load channels
t0 = datetime.now()
if len(target_channels) == 0:
target_channels = self.guild.text_channels
loading_new = 0
total_msg = 0
total_chan = 0
for channel in target_channels:
if channel.id not in self.channels:
loading_new += 1
self.channels[channel.id] = ChannelLogs(channel)
async for count, done in self.channels[channel.id].load(channel):
if count > 0:
tmp_msg = total_msg + count
warning_msg = "(this might take a while)"
if len(target_channels) > 5 and loading_new > 5:
warning_msg = (
"(most channels are new, this might take a looong while)"
)
elif loading_new > 0:
warning_msg = (
"(some channels are new, this might take a long while)"
)
dt = (datetime.now() - t0).total_seconds()
await progress.edit(
content=f"```Reading history...\n{tmp_msg} messages in {total_chan + 1} channels ({round(tmp_msg/dt)}m/s)\n{warning_msg}```"
)
if done:
total_chan += 1
total_msg += len(self.channels[channel.id].messages)
dt = (datetime.now() - t0).total_seconds()
await progress.edit(
content=f"```Analysing...\n{tmp_msg} messages in {total_chan} channels```"
)
logging.info(f"log {self.guild.id} > queried in {dt} s -> {total_msg / dt} m/s")
# write logs
t0 = datetime.now()
with open(self.log_file, mode="wb") as f:
f.write(gzip.compress(bytes(json.dumps(self.dict()), "utf-8")))
dt = (datetime.now() - t0).total_seconds()
logging.info(f"log {self.guild.id} > written in {dt} s")
return total_msg, total_chan
Regular → Executable
+3 -2
View File
@@ -1,2 +1,3 @@
discord discord.py
python-dotenv python-dotenv
git+git://github.com/Klemek/miniscord.git
+10 -17
View File
@@ -1,21 +1,18 @@
from typing import List
import logging
import discord
# DISCORD API # DISCORD API
def debug(message, txt): def debug(message: discord.Message, txt: str):
""" logging.info(f"{message.guild} > #{message.channel}: {txt}")
Print a log with the context of the current event
:param message: message that triggered the event
:type message: discord.Message
:param txt: text of the log
:type txt: str
"""
print(f"{message.guild} > #{message.channel}: {txt}")
# LISTS # LISTS
def no_duplicate(seq):
def no_duplicate(seq: list) -> list:
""" """
Remove any duplicates on a list Remove any duplicates on a list
@@ -29,16 +26,12 @@ def no_duplicate(seq):
# MESSAGE FORMATTING # MESSAGE FORMATTING
def aggregate(names):
def aggregate(names: List[str]) -> str:
""" """
Aggregate names with , and & Aggregate names with , and &
Example : "a, b, c & d" Example : "a, b, c & d"
:param names: list of names
:type names: list[str]
:return: correct aggregation
:rtype: str
""" """
if len(names) == 0: if len(names) == 0:
return "" return ""