122 Commits

Author SHA1 Message Date
Klemek 2475296b2b updated readme and version 2021-03-15 13:36:52 +01:00
Klemek ae8c6d8031 fresh argument to invalidate cache 2021-03-15 13:35:46 +01:00
Klemek aba2889a63 updated readme 2021-03-14 22:50:49 +01:00
Klemek 4956f45098 all/everyone to include bots on scans 2021-03-14 22:48:58 +01:00
Klemek f2c3185fad Merge branch 'master' of github.com:Klemek/discord-analyst 2021-03-14 22:24:11 +01:00
Klemek a0b9a0865a fixed images not counted 2021-03-14 22:21:57 +01:00
Klemek e377e19a29 Update README.md 2021-03-14 10:57:00 +01:00
Klemek e56d120f5b Update README.md 2021-03-14 10:56:33 +01:00
Klemek 1b01145723 Update README.md 2021-03-14 10:55:17 +01:00
Klemek 22af84d826 fixed empty channel bug 2021-03-13 15:47:50 +01:00
klemek 4f71feb6d7 Dockerfile support 2021-01-21 11:39:58 +01:00
klemek 0cebda1936 use of discord.AllowedMentions 2021-01-14 19:57:47 +01:00
klemek 3a4462af06 typo 2021-01-14 19:07:48 +01:00
klemek 959ece013b new scanner : %react 2021-01-14 19:04:33 +01:00
klemek 8576c74730 take members args in account 2021-01-14 18:59:12 +01:00
klemek db8d610727 fix format 2021-01-14 18:54:59 +01:00
klemek 29f711c1cc small fix 2021-01-14 18:49:41 +01:00
klemek d945b67df2 I guess it's working now 2021-01-14 18:47:02 +01:00
klemek 32b481233d small fix 2021-01-14 18:46:42 +01:00
klemek 70734ea87e updated README.md 2021-01-14 18:44:38 +01:00
klemek b42e432bcd new scanner %chan 2021-01-14 18:44:10 +01:00
klemek ec027507ad small cleanup 2021-01-14 18:14:56 +01:00
klemek 7bb7700157 new scanner %msg 2021-01-14 18:14:37 +01:00
klemek 4505d7acbd mention => counter 2021-01-14 18:02:06 +01:00
klemek f0df0501e5 show members by default in emoji scan 2021-01-14 17:53:57 +01:00
klemek b97d7dfeb5 better members in emotes 2021-01-14 17:52:16 +01:00
klemek 306e8708a4 better presence for server-wide scan 2021-01-14 17:52:09 +01:00
klemek e1c619dabb new val_sum for refactoring 2021-01-14 17:22:59 +01:00
klemek c938369eba Merge branch 'master' of ssh://github.com/Klemek/discord-analyst 2021-01-14 14:50:28 +01:00
klemek 77c8d471e4 fix presence message 2021-01-14 11:12:47 +01:00
klemek 72b5fc19ee new wip version 2021-01-14 10:26:32 +01:00
Klemek 78fc073431 Update README.md 2021-01-13 17:02:03 +01:00
Klemek d524316702 Update README.md 2021-01-13 16:53:47 +01:00
klemek ce3e6424ff 1.7 release 2021-01-13 16:44:27 +01:00
klemek 8ef2c0588d help consistancy 2021-01-13 16:44:01 +01:00
klemek aa51e2cda0 %mentioned command 2021-01-13 16:41:10 +01:00
klemek 86dc115697 %mentions 2021-01-13 16:32:48 +01:00
klemek 5d994a82e0 escaped @everyone 2021-01-13 15:56:43 +01:00
klemek e02516677a cancel command 2021-01-13 15:25:02 +01:00
klemek e106ed4e41 emojis sorting 2021-01-13 15:12:28 +01:00
klemek ecf6cfe821 new wip version 2021-01-13 14:57:57 +01:00
klemek 404e25720e fixing members sorting in emojis 2021-01-11 23:01:23 +01:00
klemek 39ddb75cfe emotes alias 2021-01-11 22:40:28 +01:00
klemek a446440e86 fixed invalid value 2021-01-11 22:36:03 +01:00
klemek 3c3d3cf9dd mentions fixing 2021-01-11 22:31:30 +01:00
klemek e1f56f222b changed formulation 2021-01-11 22:26:50 +01:00
klemek 08c1581f0e fixing mentions ffs 2021-01-11 22:25:01 +01:00
klemek dd2597a926 disable fast if no logs 2021-01-11 22:14:03 +01:00
klemek 503f866ff2 most mentioned for not member specific 2021-01-11 22:11:29 +01:00
klemek 9668d96777 fetch answer as mention if available 2021-01-11 22:08:27 +01:00
klemek 1882e5ffe3 fixed custom emoji in composition scan 2021-01-11 22:03:19 +01:00
klemek a3216d58f3 fixed formatting 2021-01-11 21:54:15 +01:00
klemek dd5801b195 fix invalid mentions 2021-01-11 21:51:22 +01:00
klemek 79a1b3e023 updated version 2021-01-11 19:51:12 +01:00
klemek 08e0888a04 updated README.md 2021-01-11 19:50:55 +01:00
klemek 2038d00d64 emotes => emojis 2021-01-11 19:47:52 +01:00
klemek 91cdc36681 usage of new answer capability 2021-01-11 19:46:30 +01:00
klemek daad1a1563 small fixes 2021-01-11 19:34:22 +01:00
klemek d48031e4b5 composition scanner working yay 2021-01-11 19:23:26 +01:00
klemek e7daaed35b scanner logs 2021-01-11 18:17:54 +01:00
klemek efb6dddce4 reworked presence to get rid of useless emote analysis 2021-01-11 18:11:43 +01:00
klemek 6a6e15b054 use predefined msg_count and total_msg 2021-01-11 17:46:18 +01:00
klemek c6572a7dde %full => %scan, %comp => %compo 2021-01-11 17:41:20 +01:00
klemek 8b6d5e5361 better percent formatting 2021-01-11 17:38:55 +01:00
klemek 155fe48e70 presence scanner working yay 2021-01-11 17:26:34 +01:00
klemek 3c8f827246 fixed help 2021-01-11 16:49:20 +01:00
klemek 7281fb9a8f factorize common help 2021-01-11 16:47:29 +01:00
klemek a1d7354280 fast argument to only load cache 2021-01-11 16:43:50 +01:00
klemek ae9cc69ed2 Other => Presence 2021-01-11 16:35:33 +01:00
klemek bfe279095d me/here arguments 2021-01-11 15:57:18 +01:00
klemek 5563945dcc formatting fix 2021-01-11 15:50:26 +01:00
klemek 034562f167 working other scanner yay 2021-01-11 15:39:57 +01:00
klemek 5c848444db better sorted usage 2021-01-11 15:19:39 +01:00
klemek b671ee527e improved loading logs 2021-01-11 15:04:02 +01:00
klemek e11bbf0327 template for new scanners 2021-01-11 12:42:59 +01:00
klemek 11921ffee9 full scanner 2021-01-11 12:36:58 +01:00
klemek 5a3e7cc2b1 template for new scanners 2021-01-11 12:20:19 +01:00
klemek 68c67dd29a working frequency scanner yay 2021-01-11 12:12:17 +01:00
klemek 655b7f257f fixing frequency analysis 2021-01-10 17:57:56 +01:00
klemek ec900ea798 working on frequency analysis 2021-01-08 17:28:01 +01:00
klemek 7702e36fa0 refactoring 2021-01-08 16:31:38 +01:00
klemek ea0febfb3c more files refactoring 2021-01-08 16:25:22 +01:00
klemek eeedcb0b2d data type package 2021-01-08 16:16:04 +01:00
klemek 4e03a9a11b Base for new scanner 2021-01-08 16:09:04 +01:00
klemek 820f57f960 updated README.md 2021-01-08 16:02:43 +01:00
klemek a463bd8fc3 moved scan to static 2021-01-08 15:00:57 +01:00
klemek 98fe94275f update README.md 2021-01-08 13:12:32 +01:00
klemek 36e10d50d7 resource folder 2021-01-08 13:04:00 +01:00
klemek 5a965bbc7a code into packages 2021-01-08 12:57:23 +01:00
klemek 038b1a294c fixes from moving to src 2021-01-08 12:43:55 +01:00
klemek 6d3d1118f1 moved sources to src folder 2021-01-08 12:31:58 +01:00
klemek 4db8ee0b52 more scanner types 2021-01-08 12:30:52 +01:00
klemek 90aa399bfb ultimate factorisation 2021-01-08 12:16:18 +01:00
klemek 03003f24b0 (WIP) refactor to extract scan constants 2021-01-07 18:28:52 +01:00
klemek 69c451d3aa reading if messages have images 2021-01-07 18:08:08 +01:00
klemek 6ed8b21d76 refactoring 2021-01-07 16:28:51 +01:00
klemek 59cf039efa more logs to not freeze api 2021-01-07 16:07:30 +01:00
klemek ff9802a874 oops 2021-01-07 14:49:54 +01:00
klemek 241f27fc56 show number of times 2021-01-07 14:37:54 +01:00
klemek 1e491e0b6e fix emote message 2021-01-07 14:32:23 +01:00
klemek 34b35944e1 refactoring + "members" query 2021-01-07 14:30:01 +01:00
klemek ef52c601b9 check arguments 2021-01-07 13:36:48 +01:00
klemek af966f21d7 fix query speed in message 2021-01-07 12:42:29 +01:00
klemek 3bfb92209d fixing warning message ffs 2021-01-07 11:01:18 +01:00
klemek 58ac967fc2 fixing warning message 2021-01-07 11:00:30 +01:00
klemek 377012cf4a fixing log manager messages 2021-01-07 10:59:11 +01:00
klemek f779c699cf overload security 2021-01-07 10:50:25 +01:00
klemek 3a96a9041a fixing log manager messages 2021-01-07 10:42:25 +01:00
klemek 1c8d5e4023 fixing log manager messages 2021-01-07 10:25:35 +01:00
klemek 9a08bf717a new version 2021-01-07 09:58:32 +01:00
klemek 98acf749f5 fixed emoji mention 2021-01-07 09:55:59 +01:00
klemek 1406f5c24a fixing a lot of stuff 2021-01-07 09:53:28 +01:00
klemek 877080edbe correct message count 2021-01-06 20:38:24 +01:00
klemek 38e4c64ab3 fixed data handling 2021-01-06 19:19:13 +01:00
Klemek b8c4a79516 Update README.md 2021-01-06 19:08:23 +01:00
klemek 77be9bc356 zipping logs 2021-01-06 19:05:18 +01:00
klemek 6b5d0dd1fe working logs query 2021-01-06 18:58:19 +01:00
klemek 014a792ad4 log management to improve speed 2021-01-06 16:30:53 +01:00
klemek 5b5e3f178b trying to optimize things (found it, it's coming) 2021-01-06 14:36:51 +01:00
klemek 2beeb1183f all emojis listing 2021-01-06 14:14:57 +01:00
klemek deebe7855c typing + transition to miniscord 2021-01-06 12:09:28 +01:00
klemek 9f7ffff43c transition to miniscord 2021-01-06 12:09:10 +01:00
36 changed files with 2491 additions and 568 deletions
Regular → Executable
+3 -1
View File
@@ -2,4 +2,6 @@
venv
__pycache__
.env
error_*
error_*
*.log
/logs/
Executable
+13
View File
@@ -0,0 +1,13 @@
FROM python
# Create app directory
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install -r requirements.txt
# Bundle app source
COPY . .
CMD [ "sh", "-c", "python src/main.py" ]
+73 -5
View File
@@ -1,15 +1,49 @@
[![Scc Count Badge](https://sloc.xyz/github/klemek/discord-analyst/?category=code)](https://github.com/boyter/scc/#badges-beta)
[![Language grade: Python](https://img.shields.io/lgtm/grade/python/g/Klemek/discord-analyst.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/Klemek/discord-analyst/context:python)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/Klemek/discord-analyst.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/Klemek/discord-analyst/alerts/)
[![Discord Bots](https://top.gg/api/widget/status/643808410495615006.svg)](https://top.gg/bot/643808410495615006)
# Discord Analyst
When you need statistics about your discord server
* `%help (command)` - info about commands
* `%info` - version and uptime
* `%emotes` - gives you a full ranking of the server emotes by usage
* Be more specific by adding some `@member` or `#channel` in arguments
```
* %help (command) - info about commands
* %info - version and uptime
* %scan - full analysis
* %freq - frequency analysis
* %compo - composition analysis
* %pres - presence analysis
* %emojis - rank emotes by their usage
* arguments:
* <n> - top <n> emojis, default is 20
* all - list all common emojis in addition to this guild's
* members - show top member for each emote
* sort:usage/reaction - other sorting methods
* %mentions - rank mentions by their usage
* arguments:
* <n> - top <n> mentions, default is 10
* all - show role/channel/everyone/here mentions
* %mentioned - rank specific user mentions by their usage
* arguments:
* <n> - top <n> mentions, default is 10
* %msg - rank users by their messages
* arguments:
* <n> - top <n> messages, default is 10
* %chan - rank channels by their messages
* arguments:
* <n> - top <n> channels, default is 10
* %react - rank users by their reactions
* arguments:
* <n> - top <n> messages, default is 10
* %cancel - cancel current analysis
* Common arguments:
* @member/me : filter for one or more member
* #channel/here : filter for one or more channel
* all/everyone - include bots messages
* fast : only read cache
```
## Running this bot
@@ -41,11 +75,45 @@ You will need:
**4. Launch bot**
```
python3 bot.py
python3 src/main.py
```
## Recommanded permissions
[x] View Channels
[x] Read Message History
[x] Send Messages
> On large servers, you should disable "Send Messages" and enable it on an read-only channel where only administrators can launch commands. The bot can't be triggered elsewhere if it can't answer.
## Already hosted bot
[![Discord Bots](https://top.gg/api/widget/643808410495615006.svg)](https://top.gg/bot/643808410495615006)
## Changelog
* **v1.9**:
* `all/everyone` to include bots in scans
* `fresh` to not use previously cached data
* bug fix
* **v1.8**:
* more scans: `%msg`, `%chan`
* bug fix
* **v1.7**:
* emojis percents
* emojis other sorting
* mentions/mentioned ranking
* `%cancel`
* **v1.6**:
* more scans: `%scan`, `%freq`, `%compo`, `%pres`
* huge bug fix
* **v1.5**:
* top <n> emotes
* bug fix
* **v1.4**:
* integrate miniscord
* insane speed with bot-side logging
* bug fix
* **v1.3**: revert to v1.1 and update requirements
* **v1.2**: don't quit on occasional exception
* **v1.1**:
-92
View File
@@ -1,92 +0,0 @@
import os
import discord
from datetime import datetime
from dotenv import load_dotenv
# Custom libs
import emotes
import help
from utils import debug
VERSION = "1.3"
t0 = datetime.now()
# Loading token
load_dotenv()
token = os.getenv('DISCORD_TOKEN')
client = discord.Client()
async def info(message, args):
"""
Computes the %info command
:param message: message sent
:type message: :class:`discord.Message`
:param args: arguments of the command
:type args: list[:class:`str`]
"""
await message.channel.send(f"```Discord Analyst v{VERSION} started at {t0:%Y-%m-%d %H:%M}```")
COMMANDS = {
"%help": help.compute,
"%emotes": emotes.compute,
"%info": info
}
@client.event
async def on_ready():
"""
Called when client is connected
"""
# Change status
await client.change_presence(
activity=discord.Game(f"v{VERSION} | %help"),
status=discord.Status.online
)
# Debug connected guilds
print(f'{client.user} v{VERSION} has connected to Discord\nto the following guilds:')
for guild in client.guilds:
print(f'- {guild.name}(id: {guild.id})')
@client.event
async def on_message(message):
"""
Called when a message is sent to any channel on any guild
:param message: message sent
:type message: discord.Message
"""
# Ignore self messages
if message.author == client.user:
return
args = message.content.split(" ")
if len(args) < 1 or args[0] not in COMMANDS:
return
debug(message, f"command '{message.content}'")
# Check if bot can respond on current channel or DM user
permissions = message.channel.permissions_for(message.guild.me)
if not permissions.send_messages:
debug(message, f"missing 'send_messages' permission")
await message.author.create_dm()
await message.author.dm_channel.send(
f"Hi, this bot doesn\'t have the permission to send a message to"
f" #{message.channel} in server '{message.guild}'")
return
# Redirect to the correct command
await COMMANDS[args[0]](message, args)
print(f"Current PID: {os.getpid()}")
client.run(token)
-391
View File
@@ -1,391 +0,0 @@
from datetime import datetime
import discord
import re
# Custom libs
import help
from utils import debug, aggregate, no_duplicate
# CONSTANTS
CHUNK_SIZE = 10000
# MAIN
async def compute(message, args):
"""
Computes the %emotes command
:param message: message sent
:type message: :class:`discord.Message`
:param args: arguments of the command
:type args: list[:class:`str`]
"""
guild = message.guild
# If "%emotes help" redirect to "%help emotes"
if len(args) > 1 and args[1] == "help":
await help.compute(message, ["%help", "emotes"])
return
# Create emotes dict from custom emojis of the guild
emotes = {str(emoji): Emote(emoji) for emoji in guild.emojis}
# Get selected channels or all of them if no channel arguments
channels = no_duplicate(message.channel_mentions)
full = len(channels) == 0
if full:
channels = guild.text_channels
# Get selected members
members = no_duplicate(message.mentions)
# Start computing data
async with message.channel.typing():
nm = 0 # number of messages treated
nmm = 0 # number of impacted messages
nc = 0 # number of channel treated
t0 = datetime.now()
# Show custom progress message and keep it to update it later
progress = await message.channel.send(f"```starting analysis...```")
# Analyse every channel selected
for channel in channels:
nm1, nmm1 = await analyse_channel(channel, emotes, members, progress, nm, nc)
# If treatment was successful, increase numbers
if nm1 >= 0:
nm += nm1
nmm += nmm1
nc += 1
# Delete custom progress message
await progress.delete()
# Display results
await tell_results(get_intro(emotes, full, channels, members, nmm, nc),
emotes, message.channel, nmm, allow_unused=full and len(members) == 0, show_life=False)
dt = (datetime.now() - t0).total_seconds()
debug(message, f"response sent {dt} s -> {nm / dt} m/s")
# CLASSES
class Emote:
"""
Custom class to store emotes data
:param emoji: source discord emoji
:type emoji: :class:`discord.Emoji`
:ivar emoji: discord emoji
:vartype emoji: discord.Emoji
:ivar usages: number of usages in messages
:vartype usages: int
:ivar reactions: number of reactions below messages (always 0 if member specified)
:vartype reactions: int
:ivar last_used: date of last use
:vartype last_used: datetime
"""
def __init__(self, emoji):
self.emoji = emoji
self.usages = 0
self.reactions = 0
self.last_used = None
def update_use(self, date):
"""
Update last use date if more recent
:param date: date of use
:type date: datetime
"""
if self.last_used is None or date > self.last_used:
self.last_used = date
def used(self):
"""
:return: If this emote was used ever
:rtype: bool
"""
return self.usages > 0 or self.reactions > 0
def score(self):
"""
:return: Score of this emote to be sorted
:rtype: float
"""
# Score is compose of usages + reactions
# When 2 emotes have the same score, the days since last use is stored in the digits
# (more recent first)
return self.usages + self.reactions + 1 / (100000 * (self.use_days() + 1))
def life_days(self):
"""
:return: Days since creation
:rtype: int
"""
return (datetime.today() - self.emoji.created_at).days
def use_days(self):
"""
:return: Days since last use
:rtype: int
"""
# If never used, use creation date instead
if self.last_used is None:
return self.life_days()
else:
return (datetime.today() - self.last_used).days
# ANALYSIS
async def analyse_channel(channel, emotes, members, progress, nm0, nc):
"""
Analyses a given channel
:param channel: channel to analyse
:type channel: discord.TextChannel
:param emotes: known emotes
:type emotes: dict[str, Emote]
:param members: selected members or empty for all
:type members: list[discord.Member]
:param progress: custom progress message (already sent)
:type progress: discord.Message
:param nm0: number of already analysed messages
:type nm0: int
:param nc: number of already analysed channels
:type nc: int
:return: nm, nc (-1 on exception)
:rtype: int, int
"""
nm = 0
nmm = 0
try:
messages = [None]
while len(messages) >= CHUNK_SIZE or messages[-1] is None:
messages = await channel.history(limit=CHUNK_SIZE, before=messages[-1]).flatten()
for m in messages:
# If author is not bot or included in the selection (empty list is all)
if not m.author.bot and (len(members) == 0 or m.author in members):
# Find all emotes un the current message in the form "<:emoji:123456789>"
# Filter for known emotes
found = [name for name in re.findall(r"(<:\w+:\d+>)", m.content) if name in emotes]
# For each emote, update its usage
for name in found:
emotes[name].usages += 1
emotes[name].update_use(m.created_at)
# Count this message as impacted
nmm += 1
# For each reaction of this message, test if known emote and update when it's the case
for reaction in m.reactions:
name = str(reaction.emoji)
# reaction.emoji can be only str, we don't want that
if not (isinstance(reaction.emoji, str)) and name in emotes:
if len(members) == 0:
emotes[name].reactions += reaction.count
emotes[name].update_use(m.created_at)
""" else:
users = await reaction.users().flatten()
for member in members:
if member in users:
emotes[name].reactions += 1
emotes[name].update_use(m.created_at)"""
nm += len(messages)
await progress.edit(content=f"```{nm0 + nm:,} messages and {nc} channels analysed```")
return nm, nmm
except discord.errors.HTTPException:
# When an exception occurs (like Forbidden) sent -1
return -1, -1
# RESULTS
async def tell_results(intro, emotes, channel, nmm, *, allow_unused, show_life):
"""
Send the full results message
:param intro: introduction sentence (from get_intro)
:type intro: str
:param emotes: known emotes
:type emotes: dict[str, Emote]
:param channel: where to send the message (original message channel)
:type channel: discord.TextChannel
:param nmm: number of impacted messages
:type nmm: int
:param allow_unused: show unused emotes
:type allow_unused: bool
:param show_life: show emotes life span
:type show_life: bool
"""
names = [name for name in emotes]
names.sort(key=lambda name: emotes[name].score(), reverse=True)
res = [intro]
res += [
f"{get_place(names.index(name))} {name} - "
f"{get_usage(emotes[name])}"
f"{get_reactions(emotes[name])}"
f"{get_life(emotes[name], show_life)}"
f"{get_last_used(emotes[name])}"
for name in names if allow_unused or emotes[name].used()]
res += [get_total(emotes, nmm)]
response = ""
for r in res:
if len(response + "\n" + r) > 2000:
await channel.send(response)
response = ""
response += "\n" + r
if len(response) > 0:
await channel.send(response)
def get_intro(emotes, full, channels, members, nmm, nc):
"""
Get the introduction sentence of the response
:param emotes: known emotes
:type emotes: dict[str, Emote]
:param full: if the scan contained all channels
:type full: bool
:param channels: channels selected (ignored if full is True)
:type channels: list[discord.TextChannel]
:param members: members selected (empty for all)
:type members: list[discord.Member]
:param nmm: number of messages impacted
:type nmm: int
:param nc: number of channels analysed
:type nc: int
:return: the correct intro sentence
:rtype: str
"""
# Show all data (members, channels) when it's less than 5 units
if len(members) == 0:
# Full scan of the server
if full:
return f"{len(emotes)} emotes in this server ({nc} channels, {nmm:,} messages):"
elif len(channels) < 5:
return f"{aggregate([c.mention for c in channels])} emotes usage in {nmm:,} messages:"
else:
return f"These {len(channels)} channels emotes usage in {nmm:,} messages:"
elif len(members) < 5:
if full:
return f"{aggregate([m.mention for m in members])} emotes usage in {nmm:,} messages:"
elif len(channels) < 5:
return f"{aggregate([m.mention for m in members])} on {aggregate([c.mention for c in channels])} " \
f"emotes usage in {nmm:,} messages:"
else:
return f"{aggregate([m.mention for m in members])} on these {len(channels)} channels " \
f"emotes usage in {nmm:,} messages:"
else:
if full:
return f"These {len(members)} members emotes usage in {nmm:,} messages:"
elif len(channels) < 5:
return f"These {len(members)} members on {aggregate([c.mention for c in channels])} " \
f"emotes usage in {nmm:,} messages:"
else:
return f"These {len(members)} members on these {len(channels)} channels " \
f"emotes usage in {nmm:,} messages:"
def get_place(i):
"""
Get the correct rank displayed (1st to 3rd have an emoji)
:param i: index
:type i: int
:return: rank string
:rtype: str
"""
if i == 0:
return ":first_place:"
if i == 1:
return ":second_place:"
if i == 2:
return ":third_place:"
return f"**#{i + 1}**"
def get_usage(emote):
"""
Get the correct usage displayed
:type emote: Emote
:return: usage description
:rtype: str
"""
if emote.usages == 0 and emote.reactions == 0:
return "never used "
elif emote.usages == 1:
return "1 time "
else:
return f"{emote.usages:,} times "
def get_reactions(emote):
"""
Get the correct reactions displayed
:return: reactions description
:rtype: str
"""
if emote.reactions == 0:
return ""
elif emote.reactions == 1:
return "and 1 reaction "
else:
return f"and {emote.reactions:,} reactions "
def get_life(emote, show_life):
"""
Get the correct life span displayed
:type emote: Emote
:param show_life: disable if False
:type show_life: bool
:return: life description
:rtype: str
"""
if not show_life:
return ""
else:
return f"(in {emote.life_days()} days) "
def get_last_used(emote):
"""
Get the correct "last used" displayed
:type emote: Emote
:return: last usage description
:rtype: str
"""
if emote.usages == 0 and emote.reactions == 0:
return ""
elif emote.use_days() == 0:
return "(last used today)"
elif emote.use_days() == 1:
return "(last used yesterday)"
else:
return f"(last used {emote.use_days()} days ago)"
def get_total(emotes, nmm):
"""
Get the total of all emotes used
:param emotes: known emotes
:type emotes: dict[str, Emote]
:param nmm: number of messages impacted
:type nmm: int
:return: total sentence
:rtype: str
"""
nu = 0
nr = 0
for name in emotes:
nu += emotes[name].usages
nr += emotes[name].reactions
if nr > 0:
return f"Total: {nu:,} times ({nu / nmm:.4f} / message) and {nr:,} reactions"
else:
return f"Total: {nu:,} times ({nu / nmm:.4f} / message)"
-31
View File
@@ -1,31 +0,0 @@
async def compute(message, args):
"""
Computes the %help command
:param message: message sent
:type message: discord.Message
:param args: arguments of the command
:type args: list[str]
"""
# Select correct response to send
response = "Discord Analyst commands:\n" \
"```\n" \
"%help (command) : Info on commands\n" \
"%info : This bot info\n" \
"%emotes : Emotes analysis\n" \
"```"
if len(args) > 1 and args[1] == "emotes":
response = "Emotes Analysis:\n" \
"```\n" \
"%emotes : Rank emotes by their usage\n" \
"%emotes @user : // for a specific user\n" \
"%emotes #channel : // for a specific channel\n" \
"(Add more @user or #channel to be more selective)\n" \
"```"
await message.channel.send(response)
Regular → Executable
+1
View File
@@ -1,2 +1,3 @@
discord.py
python-dotenv
git+git://github.com/Klemek/miniscord.git
View File
+5
View File
@@ -0,0 +1,5 @@
from .emote import Emote, get_emote_dict
from .frequency import Frequency
from .composition import Composition
from .presence import Presence
from .counter import Counter
+71
View File
@@ -0,0 +1,71 @@
from typing import List
from collections import defaultdict
from utils import percent, top_key, plural, precise, val_sum
class Composition:
def __init__(self):
self.total_characters = 0
self.plain_text = 0
self.emote_msg = 0
self.emote_only = 0
self.emotes = defaultdict(int)
self.edited = 0
self.everyone = 0
self.answers = 0
self.images = 0
self.tts = 0
self.mentions = 0
self.mention_msg = 0
self.links = 0
self.link_msg = 0
self.spoilers = 0
def to_string(self, msg_count: int) -> List[str]:
ret = []
ret += [
f"- **avg. characters / message**: {self.total_characters/msg_count:.2f}"
]
if self.plain_text > 0:
ret += [
f"- **plain text messages**: {self.plain_text:,} ({percent(self.plain_text/msg_count)})"
]
if self.edited > 0:
ret += [
f"- **edited messages**: {self.edited:,} ({percent(self.edited/msg_count)})"
]
if self.everyone > 0:
ret += [
f"- **@\u200beveryone**: {self.everyone:,} ({percent(self.everyone/msg_count)})"
]
if self.mentions > 0:
ret += [
f"- **mentions**: {self.mentions:,} (in {percent(self.mention_msg/msg_count)} of msg, avg. {precise(self.mentions/msg_count)}/msg)",
]
if self.answers > 0:
ret += [
f"- **answers**: {self.answers:,} ({percent(self.answers/msg_count)})"
]
total_emotes = val_sum(self.emotes)
if total_emotes > 0:
top_emote = top_key(self.emotes)
ret += [
f"- **emojis**: {total_emotes:,} (in {percent(self.emote_msg/msg_count)} of msg, avg. {precise(total_emotes/msg_count)}/msg)",
f"- **most used emoji**: {top_emote} ({plural(self.emotes[top_emote], 'time')}, {percent(self.emotes[top_emote]/total_emotes)})",
]
if self.emote_only > 0:
ret += [
f"- **emoji-only messages**: {self.emote_only:,} ({percent(self.emote_only/msg_count)})"
]
if self.images > 0:
ret += [f"- **images**: {self.images:,} ({percent(self.images/msg_count)})"]
if self.links > 0:
ret += [f"- **links**: {self.links:,} ({percent(self.link_msg/msg_count)})"]
if self.spoilers > 0:
ret += [
f"- **spoilers**: {self.spoilers:,} ({percent(self.spoilers/msg_count)})"
]
if self.tts > 0:
ret += [f"- **tts messages**: {self.tts:,} ({percent(self.tts/msg_count)})"]
return ret
+63
View File
@@ -0,0 +1,63 @@
from typing import Optional, Callable
from datetime import datetime
from collections import defaultdict
# Custom libs
from utils import plural, from_now, percent, val_sum, top_key
class Counter:
def __init__(self):
self.usages = defaultdict(int)
self.last_used = None
def update_use(self, count: int, date: datetime, item: int = 0):
self.usages[item] += count
if self.last_used is None or date > self.last_used:
self.last_used = date
def score(self) -> float:
# Score is compose of usages + reactions
# When 2 emotes have the same score,
# the days since last use is stored in the digits
# (more recent first)
return self.all_usages() + 1 / (
100000 * ((datetime.today() - self.last_used).days + 1)
)
def all_usages(self) -> int:
return val_sum(self.usages)
def to_string(
self,
i: int,
name: str,
*,
total_usage: int,
counted: str = "time",
transform: Optional[Callable[[int], str]] = None,
) -> str:
# place
output = ""
if i == 0:
output += ":first_place:"
elif i == 1:
output += ":second_place:"
elif i == 2:
output += ":third_place:"
else:
output += f"**#{i + 1}**"
sum = val_sum(self.usages)
output += f" {name} - {plural(sum, counted)} ({percent(sum/total_usage)}) (last {from_now(self.last_used)})"
top_item = top_key(self.usages)
if top_item != 0 and transform is not None:
if self.usages[top_item] == sum:
output += f" (all{transform(top_item)})"
else:
output += f" ({self.usages[top_item]:,}{transform(top_item)}, {percent(self.usages[top_item]/sum)})"
return output
@staticmethod
def total(d: dict) -> int:
return sum([val_sum(counter.usages) for counter in d.values()])
+106
View File
@@ -0,0 +1,106 @@
from typing import List, Optional, Dict
from datetime import datetime
from collections import defaultdict
import discord
# Custom libs
from utils import mention, plural, from_now, top_key, percent
class Emote:
"""
Custom class to store emotes data
"""
def __init__(self, emoji: Optional[discord.Emoji] = None):
self.emoji = emoji
self.usages = 0
self.reactions = 0
self.last_used = None
self.members = defaultdict(int)
def update_use(self, date: datetime, members_id: List[int]):
"""
Update last use date if more recent and last member
"""
if self.last_used is None or date > self.last_used:
self.last_used = date
for member_id in members_id:
self.members[member_id] += 1
def used(self) -> bool:
return self.usages > 0 or self.reactions > 0
def score(self, *, usage_weight: int = 1, react_weight: int = 1) -> float:
# Score is compose of usages + reactions
# When 2 emotes have the same score,
# the days since last use is stored in the digits
# (more recent first)
return (
self.usages * usage_weight
+ self.reactions * react_weight
+ 1 / (100000 * (self.use_days() + 1))
)
def life_days(self) -> int:
return (datetime.today() - self.emoji.created_at).days
def use_days(self) -> int:
# If never used, use creation date instead
if self.last_used is None:
return self.life_days()
else:
return (datetime.today() - self.last_used).days
def get_top_member(self) -> int:
return top_key(self.members)
def to_string(
self,
i: int,
name: str,
*,
total_usage: int,
total_react: int,
show_life: bool,
show_members: bool,
) -> str:
# place
output = ""
if i == 0:
output += ":first_place:"
elif i == 1:
output += ":second_place:"
elif i == 2:
output += ":third_place:"
else:
output += f"**#{i + 1}**"
output += f" {name} - "
if not self.used():
output += "never used"
else:
if self.usages > 0:
output += f"{plural(self.usages, 'time')} ({percent(self.usages/total_usage)})"
if self.usages > 0 and self.reactions > 0:
output += " and "
if self.reactions >= 1:
output += f"{plural(self.reactions, 'reaction')} ({percent(self.reactions/total_react)})"
output += f" (last used {from_now(self.last_used)})"
if show_members:
top_member = self.get_top_member()
total = self.usages + self.reactions
if total == self.members[top_member]:
output += f" (all by {mention(top_member)})"
else:
output += f" ({self.members[top_member]} by {mention(top_member)}, {percent(self.members[top_member]/total)})"
if show_life and not self.default:
output += f" (in {plural(self.life_days(), 'day')})"
return output
def get_emote_dict(guild: discord.Guild) -> Dict[str, Emote]:
emotes = defaultdict(Emote)
for emoji in guild.emojis:
emotes[str(emoji)] = Emote(emoji)
return emotes
+44
View File
@@ -0,0 +1,44 @@
from typing import List
from datetime import timedelta
import calendar
from utils import str_date, str_datetime, from_now, plural, percent, precise, top_key
class Frequency:
def __init__(self):
self.dates = []
self.longest_break = timedelta(seconds=0)
self.longest_break_start = None
self.week = {i: 0 for i in range(7)}
self.day = {i: 0 for i in range(24)}
self.busiest_day = None
self.busiest_day_count = 0
self.busiest_hour = None
self.busiest_hour_count = 0
def to_string(self) -> List[str]:
delta = self.dates[-1] - self.dates[0]
total_msg = len(self.dates)
busiest_weekday = top_key(self.week)
busiest_hour = top_key(self.day)
n_weekdays = delta.days // 7
if (
self.dates[0].weekday() <= busiest_weekday
and self.dates[-1].weekday() >= busiest_weekday
):
n_weekdays += 1
n_hours = delta.days
if self.dates[0].hour <= busiest_hour and self.dates[-1].hour >= busiest_hour:
n_hours += 1
return [
f"- **earliest message**: {str_datetime(self.dates[0])} ({from_now(self.dates[0])})",
f"- **latest message**: {str_datetime(self.dates[-1])} ({from_now(self.dates[-1])})",
f"- **messages/day**: {precise(total_msg/delta.days, precision=3)}",
f"- **busiest day of week**: {calendar.day_name[busiest_weekday]} (~{precise(self.week[busiest_weekday]/n_weekdays, precision=3)} msg, {percent(self.week[busiest_weekday]/total_msg)})",
f"- **busiest day ever**: {str_date(self.busiest_day)} ({from_now(self.busiest_day)}) ({self.busiest_day_count} msg)",
f"- **messages/hour**: {precise(total_msg*3600/delta.total_seconds(), precision=3)}",
f"- **busiest hour of day**: {busiest_hour:0>2}:00 (~{precise(self.day[busiest_hour]/n_hours, precision=3)} msg, {percent(self.day[busiest_hour]/total_msg)})",
f"- **busiest hour ever**: {str_datetime(self.busiest_hour)} ({from_now(self.busiest_hour)}) ({self.busiest_hour_count} msg)",
f"- **longest break**: {plural(round(self.longest_break.total_seconds()/3600), 'hour')} ({plural(self.longest_break.days,'day')}) from {str_datetime(self.longest_break_start)} ({from_now(self.longest_break_start)})",
]
+98
View File
@@ -0,0 +1,98 @@
from typing import List, Optional
from collections import defaultdict
from utils import mention, channel_mention, plural, percent, top_key, val_sum
class Presence:
def __init__(self):
self.messages = defaultdict(int)
self.reactions = defaultdict(int)
self.used_reaction = defaultdict(int)
self.channel_usage = defaultdict(int)
self.channel_total = defaultdict(int)
self.mentions = defaultdict(int)
self.mention_others = defaultdict(int)
self.mention_count = defaultdict(int)
def to_string(
self,
msg_count: int,
total_msg: int,
*,
chan_count: Optional[int],
show_top_channel: bool,
member_specific: bool,
) -> List[str]:
ret = []
if chan_count is None:
type = "server's"
elif chan_count == 1:
type = "channel's"
else:
type = "channels'"
if member_specific:
ret += [
f"- **messages**: {msg_count:,} ({percent(msg_count/total_msg)} of {type})"
]
else:
top_member = top_key(self.messages)
ret += [
f"- **top messages**: {mention(top_member)} ({self.messages[top_member]:,} msg, {percent(self.messages[top_member]/val_sum(self.messages))})"
]
if show_top_channel:
top_channel = top_key(self.channel_usage)
channel_sum = val_sum(self.channel_usage)
found_in = sorted(
self.channel_usage,
key=lambda k: self.channel_usage[k] / self.channel_total[k],
)[-1]
ret += [
f"- **most visited channel**: {channel_mention(top_channel)} ({self.channel_usage[top_channel]:,} msg, {percent(self.channel_usage[top_channel]/channel_sum)})",
]
if member_specific:
ret += [
f"- **most contributed channel**: {channel_mention(found_in)} ({self.channel_usage[found_in]:,} msg, {percent(self.channel_usage[found_in]/self.channel_total[found_in])} of {type})"
]
if member_specific:
if len(self.mentions) > 0:
top_mention = top_key(self.mentions)
mention_sum = val_sum(self.mentions)
ret += [
f"- **was mentioned**: {plural(mention_sum, 'time')} ({percent(mention_sum/val_sum(self.mention_count))} of {type})",
f"- **mostly mentioned by**: {mention(top_mention)} ({plural(self.mentions[top_mention], 'time')}, {percent(self.mentions[top_mention]/mention_sum)})",
]
if len(self.mention_others) > 0:
top_mention = top_key(self.mention_others)
mention_sum = val_sum(self.mention_others)
if member_specific:
ret += [
f"- **mentioned others**: {plural(mention_sum, 'time')} ({percent(mention_sum/val_sum(self.mention_count))} of {type})",
f"- **mostly mentioned**: {mention(top_mention)} ({plural(self.mention_others[top_mention], 'time')}, {percent(self.mention_others[top_mention]/mention_sum)})",
]
else:
top_member = top_key(self.mention_count)
ret += [
f"- **mentioned**: {plural(mention_sum, 'time')} ({mention(top_member)}, {percent(self.mention_count[top_member]/val_sum(self.mention_count))})",
f"- **top mentions**: {mention(top_member)} ({plural(self.mention_count[top_member], 'time')}, {percent(self.mention_count[top_member]/val_sum(self.mention_count))})",
f"- **most mentioned**: {mention(top_mention)} ({plural(self.mention_others[top_mention], 'time')}, {percent(self.mention_others[top_mention]/mention_sum)})",
]
if len(self.reactions) > 0:
total_used = val_sum(self.reactions)
top_reaction = top_key(self.reactions)
ret += [
f"- **reactions**: {plural(total_used, 'time')}",
f"- **most used reaction**: {top_reaction} ({plural(self.reactions[top_reaction], 'time')}, {percent(self.reactions[top_reaction]/total_used)})",
]
if member_specific:
ret[
-2
] += f" ({percent(total_used/val_sum(self.used_reaction))} of {type})"
else:
top_member = top_key(self.used_reaction)
ret.insert(
-1,
f"- **top reactions**: {mention(top_member)} ({plural(self.used_reaction[top_member], 'time')}, {percent(self.used_reaction[top_member]/val_sum(self.used_reaction))})",
)
return ret
+3
View File
@@ -0,0 +1,3 @@
from .message_log import MessageLog
from .channel_logs import ChannelLogs
from .guild_logs import GuildLogs, ALREADY_RUNNING, CANCELLED
+80
View File
@@ -0,0 +1,80 @@
from typing import Union, Tuple
import discord
from . import MessageLog
from utils import FakeMessage
CHUNK_SIZE = 1000
FORMAT = 3
class ChannelLogs:
def __init__(self, channel: Union[discord.TextChannel, dict]):
if isinstance(channel, discord.TextChannel):
self.id = channel.id
self.name = channel.name
self.last_message_id = None
self.format = FORMAT
self.messages = []
elif isinstance(channel, dict):
self.format = channel["format"] if "format" in channel else None
if not self.is_format():
return
self.id = int(channel["id"])
self.name = channel["name"]
self.last_message_id = (
int(channel["last_message_id"])
if channel["last_message_id"] is not None
else None
)
self.messages = [MessageLog(message) for message in channel["messages"]]
def is_format(self):
return self.format == FORMAT
async def load(self, channel: discord.TextChannel) -> Tuple[int, int]:
self.name = channel.name
self.channel = channel
try:
if self.last_message_id is not None: # append
while self.last_message_id != channel.last_message_id:
async for message in channel.history(
limit=CHUNK_SIZE,
after=FakeMessage(self.last_message_id),
oldest_first=True,
):
self.last_message_id = message.id
m = MessageLog(message)
await m.load(message)
self.messages.insert(0, m)
yield len(self.messages), False
else: # first load
last_message_id = None
done = 0
sanity_check = len(await channel.history(limit=1).flatten())
if sanity_check == 1:
while done >= CHUNK_SIZE or last_message_id is None:
done = 0
async for message in channel.history(
limit=CHUNK_SIZE,
before=FakeMessage(last_message_id)
if last_message_id is not None
else None,
oldest_first=False,
):
done += 1
last_message_id = message.id
m = MessageLog(message)
await m.load(message)
self.messages += [m]
yield len(self.messages), False
self.last_message_id = channel.last_message_id
except discord.errors.HTTPException:
return # When an exception occurs (like Forbidden)
yield len(self.messages), True
def dict(self) -> dict:
channel = dict(self.__dict__)
channel.pop("channel", None)
channel["messages"] = [message.dict() for message in self.messages]
return channel
+213
View File
@@ -0,0 +1,213 @@
from typing import List, Tuple
import os
import discord
import json
import gzip
from datetime import datetime
import logging
from . import ChannelLogs
from utils import code_message, delta, deltas
LOG_DIR = "logs"
current_analysis = []
ALREADY_RUNNING = -100
CANCELLED = -200
class GuildLogs:
def __init__(self, guild: discord.Guild):
self.guild = guild
self.log_file = os.path.join(LOG_DIR, f"{guild.id}.logz")
self.channels = {}
def dict(self) -> dict:
return {id: self.channels[id].dict() for id in self.channels}
def check_cancelled(self) -> bool:
return self.log_file not in current_analysis
async def load(
self,
progress: discord.Message,
target_channels: List[discord.TextChannel] = [],
*,
fast: bool,
fresh: bool,
) -> Tuple[int, int]:
global current_analysis
if self.log_file in current_analysis:
return ALREADY_RUNNING, 0
current_analysis += [self.log_file]
t00 = datetime.now()
# read logs
if not os.path.exists(LOG_DIR):
os.mkdir(LOG_DIR)
if os.path.exists(self.log_file):
channels = {}
try:
gziped_data = None
await code_message(progress, "Reading saved history (1/4)...")
t0 = datetime.now()
with open(self.log_file, mode="rb") as f:
gziped_data = f.read()
logging.info(f"log {self.guild.id} > read in {delta(t0):,}ms")
if self.check_cancelled():
return CANCELLED, 0
await code_message(progress, "Reading saved history (2/4)...")
t0 = datetime.now()
json_data = gzip.decompress(gziped_data)
logging.info(
f"log {self.guild.id} > gzip decompress in {delta(t0):,}ms"
)
if self.check_cancelled():
return CANCELLED, 0
await code_message(progress, "Reading saved history (3/4)...")
t0 = datetime.now()
channels = json.loads(json_data)
logging.info(f"log {self.guild.id} > json parse in {delta(t0):,}ms")
if self.check_cancelled():
return CANCELLED, 0
await code_message(progress, "Reading saved history (4/4)...")
t0 = datetime.now()
self.channels = {int(id): ChannelLogs(channels[id]) for id in channels}
# remove invalid format
self.channels = {
id: self.channels[id]
for id in self.channels
if self.channels[id].is_format()
}
logging.info(f"log {self.guild.id} > loaded in {delta(t0):,}ms")
except json.decoder.JSONDecodeError:
logging.error(f"log {self.guild.id} > invalid JSON")
except IOError:
logging.error(f"log {self.guild.id} > cannot read")
else:
fast = False
total_msg = 0
total_chan = 0
if fast:
if len(target_channels) == 0:
total_msg = sum(
[len(channel.messages) for channel in self.channels.values()]
)
total_chan = len(self.channels)
else:
target_channels_id = [channel.id for channel in target_channels]
total_msg = sum(
[
len(channel.messages)
for channel in self.channels.values()
if channel.id in target_channels_id
]
)
total_chan = len(target_channels)
else:
# load channels
t0 = datetime.now()
if len(target_channels) == 0:
target_channels = self.guild.text_channels
loading_new = 0
queried_msg = 0
total_chan = 0
max_chan = len(target_channels)
if self.check_cancelled():
return CANCELLED, 0
await code_message(
progress,
f"Reading new history...\n0 messages in 0/{max_chan:,} channels\n(this might take a while)",
)
for channel in target_channels:
if channel.id not in self.channels or fresh:
loading_new += 1
self.channels[channel.id] = ChannelLogs(channel)
start_msg = len(self.channels[channel.id].messages)
count = 0
async for count, done in self.channels[channel.id].load(channel):
if count > 0:
tmp_queried_msg = queried_msg + count - start_msg
tmp_msg = total_msg + count
warning_msg = "(this might take a while)"
if len(target_channels) > 5 and loading_new > 5:
warning_msg = "(most channels are new, this might take a looong while)"
elif loading_new > 0:
warning_msg = (
"(some channels are new, this might take a long while)"
)
if self.check_cancelled():
return CANCELLED, 0
await code_message(
progress,
f"Reading new history...\n{tmp_msg:,} messages in {total_chan + 1:,}/{max_chan:,} channels ({round(tmp_queried_msg/deltas(t0)):,}m/s)\n{warning_msg}",
)
if done:
total_chan += 1
total_msg += len(self.channels[channel.id].messages)
queried_msg += count - start_msg
logging.info(
f"log {self.guild.id} > queried in {delta(t0):,}ms -> {queried_msg / deltas(t0):,.3f} m/s"
)
# write logs
real_total_msg = sum(
[len(channel.messages) for channel in self.channels.values()]
)
real_total_chan = len(self.channels)
if self.check_cancelled():
return CANCELLED, 0
await code_message(
progress,
f"Saving history (1/3)...\n{real_total_msg:,} messages in {real_total_chan:,} channels",
)
t0 = datetime.now()
json_data = bytes(json.dumps(self.dict()), "utf-8")
logging.info(
f"log {self.guild.id} > json dump in {delta(t0):,}ms -> {real_total_msg / deltas(t0):,.3f} m/s"
)
if self.check_cancelled():
return CANCELLED, 0
await code_message(
progress,
f"Saving history (2/3)...\n{real_total_msg:,} messages in {real_total_chan:,} channels",
)
t0 = datetime.now()
gziped_data = gzip.compress(json_data)
logging.info(
f"log {self.guild.id} > gzip in {delta(t0):,}ms -> {real_total_msg / deltas(t0):,.3f} m/s"
)
if self.check_cancelled():
return CANCELLED, 0
await code_message(
progress,
f"Saving history (3/3)...\n{real_total_msg:,} messages in {real_total_chan:,} channels",
)
t0 = datetime.now()
with open(self.log_file, mode="wb") as f:
f.write(gziped_data)
logging.info(
f"log {self.guild.id} > saved in {delta(t0):,}ms -> {real_total_msg / deltas(t0):,.3f} m/s"
)
if self.check_cancelled():
return CANCELLED, 0
await code_message(
progress,
f"Analysing...\n{total_msg:,} messages in {total_chan:,} channels",
)
logging.info(f"log {self.guild.id} > TOTAL TIME: {delta(t00):,}ms")
current_analysis.remove(self.log_file)
return total_msg, total_chan
@staticmethod
async def cancel(client: discord.client, message: discord.Message, *args: str):
logs = GuildLogs(message.guild)
if logs.log_file in current_analysis:
current_analysis.remove(logs.log_file)
else:
await message.channel.send(
f"No analysis are currently running on this server", reference=message
)
+85
View File
@@ -0,0 +1,85 @@
from typing import Union
import discord
from datetime import datetime
from utils import is_extension
IMAGE_FORMAT = [".gif", ".gifv", ".png", ".jpg", ".jpeg", ".bmp"]
EMBED_IMAGES = ["image", "gifv"]
class MessageLog:
def __init__(self, message: Union[discord.Message, dict]):
if isinstance(message, discord.Message):
self.id = message.id
self.created_at = message.created_at
self.edited_at = message.edited_at
self.author = message.author.id
self.pinned = message.pinned
self.mention_everyone = message.mention_everyone
self.tts = message.tts
self.bot = message.author.bot or message.author.system
self.content = message.content
self.mentions = message.raw_mentions
if message.reference is not None:
self.reference = message.reference.message_id
if message.reference.resolved is not None:
try:
self.mentions += [message.reference.resolved.author.id]
except AttributeError:
pass
else:
self.reference = None
self.role_mentions = message.raw_role_mentions
self.channel_mentions = message.raw_channel_mentions
self.image = False
self.attachment = len(message.attachments) > 0
self.embed = len(message.embeds) > 0
for attachment in message.attachments:
if is_extension(attachment.filename, IMAGE_FORMAT):
self.image = True
break
else:
for embed in message.embeds:
if embed.type in EMBED_IMAGES:
self.image = True
break
self.reactions = {}
elif isinstance(message, dict):
self.id = int(message["id"])
self.created_at = datetime.fromisoformat(message["created_at"])
self.edited_at = (
datetime.fromisoformat(message["edited_at"])
if message["edited_at"] is not None
else None
)
self.author = int(message["author"])
self.pinned = message["pinned"]
self.mention_everyone = message["mention_everyone"]
self.tts = message["tts"]
self.reference = (
int(message["reference"]) if message["reference"] is not None else None
)
self.bot = message["bot"]
self.content = message["content"]
self.mentions = [int(m) for m in message["mentions"]]
self.role_mentions = [int(m) for m in message["role_mentions"]]
self.channel_mentions = [int(m) for m in message["channel_mentions"]]
self.image = message["image"]
self.embed = message["embed"]
self.attachment = message["attachment"]
self.reactions = message["reactions"]
async def load(self, message: discord.Message):
for reaction in message.reactions:
self.reactions[str(reaction.emoji)] = []
async for user in reaction.users():
self.reactions[str(reaction.emoji)] += [user.id]
def dict(self) -> dict:
message = dict(self.__dict__)
message["created_at"] = self.created_at.isoformat()
message["edited_at"] = (
self.edited_at.isoformat() if self.edited_at is not None else None
)
return message
+100
View File
@@ -0,0 +1,100 @@
from miniscord import Bot
import logging
from utils import emojis
from scanners import (
EmotesScanner,
FullScanner,
FrequencyScanner,
CompositionScanner,
PresenceScanner,
MentionsScanner,
MentionedScanner,
MessagesScanner,
ChannelsScanner,
ReactionsScanner,
)
from logs import GuildLogs
logging.basicConfig(
format="[%(asctime)s][%(levelname)s][%(module)s] %(message)s", level=logging.INFO
)
emojis.load_emojis()
bot = Bot(
"Discord Analyst",
"1.9",
alias="%",
)
bot.log_calls = True
bot.register_command(
"(cancel|stop)",
GuildLogs.cancel,
"cancel: stop current analysis",
"```\n" + "%cancel: Stop current analysis\n" + "```",
)
bot.register_command(
"mentioned",
lambda *args: MentionedScanner().compute(*args),
"mentioned: rank specific user mentions by their usage",
MentionedScanner.help(),
)
bot.register_command(
"(mentions?)",
lambda *args: MentionsScanner().compute(*args),
"mentions: rank mentions by their usage",
MentionsScanner.help(),
)
bot.register_command(
"(emojis?|emotes?)",
lambda *args: EmotesScanner().compute(*args),
"emojis: rank emojis by their usage",
EmotesScanner.help(),
)
bot.register_command(
"(react(ions?)?)",
lambda *args: ReactionsScanner().compute(*args),
"react: rank users by their reactions",
ReactionsScanner.help(),
)
bot.register_command(
"(channels?|chan)",
lambda *args: ChannelsScanner().compute(*args),
"chan: rank channels by their messages",
ChannelsScanner.help(),
)
bot.register_command(
"(messages?|msg)",
lambda *args: MessagesScanner().compute(*args),
"msg: rank users by their messages",
MessagesScanner.help(),
)
bot.register_command(
"pres(ence)?",
lambda *args: PresenceScanner().compute(*args),
"pres: presence analysis",
PresenceScanner.help(),
)
bot.register_command(
"compo(sition)?",
lambda *args: CompositionScanner().compute(*args),
"compo: composition analysis",
CompositionScanner.help(),
)
bot.register_command(
"freq(ency)?",
lambda *args: FrequencyScanner().compute(*args),
"freq: frequency analysis",
FrequencyScanner.help(),
)
bot.register_command(
"(full|scan)",
lambda *args: FullScanner().compute(*args),
"scan: full analysis",
FullScanner.help(),
)
bot.start()
File diff suppressed because one or more lines are too long
+10
View File
@@ -0,0 +1,10 @@
from .emotes_scanner import EmotesScanner
from .frequency_scanner import FrequencyScanner
from .composition_scanner import CompositionScanner
from .presence_scanner import PresenceScanner
from .full_scanner import FullScanner
from .mentions_scanner import MentionsScanner
from .mentioned_scanner import MentionedScanner
from .messages_scanner import MessagesScanner
from .channels_scanner import ChannelsScanner
from .reactions_scanner import ReactionsScanner
+91
View File
@@ -0,0 +1,91 @@
from typing import Dict, List
from collections import defaultdict
import discord
# Custom libs
from logs import ChannelLogs, MessageLog
from .scanner import Scanner
from data_types import Counter
from utils import COMMON_HELP_ARGS, mention, channel_mention
class ChannelsScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%chan: Rank channels by their messages\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* <n> - top <n>, default is 10\n"
+ "* all/everyone - include bots\n"
+ "Example: %chan 10 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
has_digit_args=True,
valid_args=["all", "everyone"],
help=ChannelsScanner.help(),
intro_context="Channels",
)
async def init(self, message: discord.Message, *args: str) -> bool:
# get max emotes to view
self.top = 10
for arg in args:
if arg.isdigit():
self.top = int(arg)
self.all_messages = "all" in args or "everyone" in args
# Create mentions dict
self.messages = defaultdict(Counter)
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return ChannelsScanner.analyse_message(
channel.id,
message,
self.messages,
self.raw_members,
all_messages=self.all_messages,
)
def get_results(self, intro: str) -> List[str]:
names = [name for name in self.messages]
names.sort(key=lambda name: self.messages[name].score(), reverse=True)
names = names[: self.top]
usage_count = Counter.total(self.messages)
res = [intro]
res += [
self.messages[name].to_string(
names.index(name),
channel_mention(name),
total_usage=usage_count,
counted="message",
transform=lambda id: f" by {mention(id)}",
)
for name in names
]
return res
@staticmethod
def analyse_message(
channel_id: int,
message: MessageLog,
messages: Dict[str, Counter],
raw_members: List[int],
*,
all_messages: bool,
) -> bool:
impacted = False
if (
len(raw_members) == 0
and (not message.bot or all_messages)
or message.author in raw_members
):
impacted = True
messages[channel_id].update_use(1, message.created_at, message.author)
return impacted
+125
View File
@@ -0,0 +1,125 @@
from typing import List
import re
import discord
# Custom libs
from .scanner import Scanner
from data_types import Composition
from logs import ChannelLogs, MessageLog
from utils import emojis, COMMON_HELP_ARGS
class CompositionScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%compo: Show composition statistics\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* all/everyone - include bots\n"
+ "Example: %compo #mychannel1 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
valid_args=["all", "everyone"],
help=CompositionScanner.help(),
intro_context="Composition",
)
async def init(self, message: discord.Message, *args: str) -> bool:
self.compo = Composition()
self.all_messages = "all" in args or "everyone" in args
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
ret = CompositionScanner.analyse_message(
message, self.compo, self.raw_members, all_messages=self.all_messages
)
return ret
def get_results(self, intro: str) -> List[str]:
res = [intro]
res += self.compo.to_string(self.msg_count)
return res
@staticmethod
def analyse_message(
message: MessageLog,
compo: Composition,
raw_members: List[int],
*,
all_messages: bool,
) -> bool:
impacted = False
# If author is included in the selection (empty list is all)
if (
(not message.bot or all_messages)
and len(raw_members) == 0
or message.author in raw_members
):
impacted = True
compo.total_characters += len(message.content)
emotes_found = emojis.regex.findall(message.content)
without_emote = message.content
for name in emotes_found:
if name in emojis.unicode_list or re.match(
r"(<a?:[\w\-\~]+:\d+>|:[\w\\-\~]+:)", name
):
compo.emotes[name] += 1
i = without_emote.index(name)
without_emote = without_emote[:i] + without_emote[i + len(name) :]
if len(message.content.strip()) > 0 and len(without_emote.strip()) == 0:
compo.emote_only += 1
if len(emotes_found) > 0:
compo.emote_msg += 1
links_found = re.findall(r"https?:\/\/", message.content)
compo.links += len(links_found)
if len(links_found) > 0:
compo.link_msg += 1
mentions = (
len(message.mentions)
+ len(message.role_mentions)
+ len(message.channel_mentions)
)
if message.mention_everyone:
compo.everyone += 1
mentions += 1
if mentions > 0:
compo.mentions += mentions
compo.mention_msg += 1
spoilers_found = re.findall(r"\|\|[^|]+\|\|", message.content)
if len(spoilers_found) > 0:
compo.spoilers += 1
if message.edited_at is not None:
compo.edited += 1
if message.reference is not None:
compo.answers += 1
if message.image:
compo.images += 1
if message.tts:
compo.tts += 1
if (
len(emotes_found) == 0
and message.reference is None
and not message.image
and len(message.mentions) == 0
and len(message.role_mentions) == 0
and len(message.channel_mentions) == 0
and not message.tts
and not message.mention_everyone
and not message.embed
and not message.attachment
):
compo.plain_text += 1
return impacted
+149
View File
@@ -0,0 +1,149 @@
from typing import Dict, List
from collections import defaultdict
import discord
# Custom libs
from logs import ChannelLogs, MessageLog
from data_types import Emote, get_emote_dict
from .scanner import Scanner
from utils import emojis, COMMON_HELP_ARGS, plural, precise
class EmotesScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%emojis: Rank emojis by their usage\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* <n> - top <n> emojis, default is 20\n"
+ "* all - list all common emojis in addition to this guild's\n"
+ "* members - show top member for each emojis\n"
+ "* sort:usage/reaction - other sorting methods\n"
+ "* everyone - include bots\n"
+ "Example: %emojis 10 all #mychannel1 #mychannel2 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
has_digit_args=True,
valid_args=["all", "members", "sort:usage", "sort:reaction", "everyone"],
help=EmotesScanner.help(),
intro_context="Emoji usage",
)
async def init(self, message: discord.Message, *args: str) -> bool:
guild = message.channel.guild
# get max emotes to view
self.top = 20
for arg in args:
if arg.isdigit():
self.top = int(arg)
# check other args
self.all_emojis = "all" in args
self.show_members = "members" in args and (
len(self.members) == 0 or len(self.members) > 1
)
# Create emotes dict from custom emojis of the guild
self.emotes = get_emote_dict(guild)
self.sort = None
if "sort:usage" in args:
self.sort = "usage"
elif "sort:reaction" in args:
self.sort = "reaction"
self.all_messages = "everyone" in args
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return EmotesScanner.analyse_message(
message,
self.emotes,
self.raw_members,
all_emojis=self.all_emojis,
all_messages=self.all_messages,
)
def get_results(self, intro: str) -> List[str]:
names = [name for name in self.emotes]
names.sort(
key=lambda name: self.emotes[name].score(
usage_weight=(0 if self.sort == "reaction" else 1),
react_weight=(0 if self.sort == "usage" else 1),
),
reverse=True,
)
names = names[: self.top]
# Get the total of all emotes used
usage_count = 0
reaction_count = 0
for name in self.emotes:
usage_count += self.emotes[name].usages
reaction_count += self.emotes[name].reactions
res = [intro]
allow_unused = self.full and len(self.members) == 0
if self.sort is not None:
res += [f"(Sorted by {self.sort})"]
res += [
self.emotes[name].to_string(
names.index(name),
name,
total_usage=usage_count,
total_react=reaction_count,
show_life=False,
show_members=self.show_members or len(self.raw_members) == 0,
)
for name in names
if allow_unused or self.emotes[name].used()
]
res += [
f"Total: {plural(usage_count,'time')} ({precise(usage_count/self.msg_count)}/msg)"
]
if reaction_count > 0:
res[-1] += f" and {plural(reaction_count, 'reaction')}"
return res
@staticmethod
def analyse_message(
message: MessageLog,
emotes: Dict[str, Emote],
raw_members: List[int],
*,
all_emojis: bool,
all_messages: bool,
) -> bool:
impacted = False
# If author is included in the selection (empty list is all)
if (
(not message.bot or all_messages)
and len(raw_members) == 0
or message.author in raw_members
):
impacted = True
# Find all emotes un the current message in the form "<:emoji:123456789>"
# Filter for known emotes
found = emojis.regex.findall(message.content)
# For each emote, update its usage
for name in found:
if name not in emotes:
if not all_emojis or name not in emojis.unicode_list:
continue
emotes[name].usages += 1
emotes[name].update_use(message.created_at, [message.author])
# For each reaction of this message, test if known emote and update when it's the case
for name in message.reactions:
if name not in emotes:
if not all_emojis or name not in emojis.unicode_list:
continue
if len(raw_members) == 0:
emotes[name].reactions += len(message.reactions[name])
emotes[name].update_use(message.created_at, message.reactions[name])
else:
for member in raw_members:
if member in message.reactions[name]:
emotes[name].reactions += 1
emotes[name].update_use(message.created_at, [member])
return impacted
+107
View File
@@ -0,0 +1,107 @@
from typing import List
from datetime import datetime
import discord
# Custom libs
from .scanner import Scanner
from data_types import Frequency
from logs import ChannelLogs, MessageLog
from utils import COMMON_HELP_ARGS
class FrequencyScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%freq: Show frequency-related statistics\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* all/everyone - include bots\n"
+ "Example: %freq #mychannel1 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
valid_args=["all", "everyone"],
help=FrequencyScanner.help(),
intro_context="Frequency",
)
async def init(self, message: discord.Message, *args: str) -> bool:
self.freq = Frequency()
self.all_messages = "all" in args or "everyone" in args
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return FrequencyScanner.analyse_message(
message, self.freq, self.raw_members, all_messages=self.all_messages
)
def get_results(self, intro: str) -> List[str]:
FrequencyScanner.compute_results(self.freq)
res = [intro]
res += self.freq.to_string()
return res
@staticmethod
def analyse_message(
message: MessageLog,
freq: Frequency,
raw_members: List[int],
*,
all_messages: bool
) -> bool:
impacted = False
# If author is included in the selection (empty list is all)
if (
(not message.bot or all_messages)
and len(raw_members) == 0
or message.author in raw_members
):
impacted = True
freq.dates += [message.created_at]
return impacted
@staticmethod
def compute_results(freq: Frequency):
freq.dates.sort()
latest = freq.dates[0]
current_day = 0
current_day_date = freq.dates[0]
current_day_count = 0
current_hour_buffer = []
for date in freq.dates:
# calculate longest break
delay = date - latest
if delay > freq.longest_break:
freq.longest_break = delay
freq.longest_break_start = latest
latest = date
# calculate busiest weekday / hours
freq.week[date.weekday()] += 1
freq.day[date.hour] += 1
# calculate busiest day ever
start_delta = date - freq.dates[0]
if start_delta.days > current_day:
if current_day_count > freq.busiest_day_count:
freq.busiest_day_count = current_day_count
freq.busiest_day = current_day_date
current_day = start_delta.days
current_day_date = date
current_day_count = 0
else:
current_day_count += 1
# calculate busiest hour ever
while (
len(current_hour_buffer) > 0
and (date - current_hour_buffer[0]).total_seconds() > 3600
):
current_hour_buffer.pop(0)
current_hour_buffer += [date]
if len(current_hour_buffer) > freq.busiest_hour_count:
freq.busiest_hour = current_hour_buffer[0]
freq.busiest_hour_count = len(current_hour_buffer)
+77
View File
@@ -0,0 +1,77 @@
from typing import List
import discord
# Custom libs
from .scanner import Scanner
from . import FrequencyScanner, CompositionScanner, PresenceScanner
from data_types import Frequency, Composition, Presence
from logs import ChannelLogs, MessageLog
from utils import COMMON_HELP_ARGS
class FullScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%scan: Show full statistics\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* all/everyone - include bots\n"
+ "Example: %scan #mychannel1 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
valid_args=["all", "everyone"],
help=FullScanner.help(),
intro_context="Full analysis",
)
async def init(self, message: discord.Message, *args: str) -> bool:
self.freq = Frequency()
self.compo = Composition()
self.pres = Presence()
self.member_specific = len(self.members) > 0
self.all_messages = "all" in args or "everyone" in args
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
FrequencyScanner.analyse_message(
message, self.freq, self.raw_members, all_messages=self.all_messages
)
CompositionScanner.analyse_message(
message, self.compo, self.raw_members, all_messages=self.all_messages
)
PresenceScanner.analyse_message(
channel,
message,
self.pres,
self.raw_members,
all_messages=self.all_messages,
)
return (
(not message.bot or self.all_messages)
and len(self.raw_members) == 0
or message.author in self.raw_members
)
def get_results(self, intro: str) -> List[str]:
FrequencyScanner.compute_results(self.freq)
res = [intro]
res += ["__Frequency__:"]
res += self.freq.to_string()
res += ["__Composition__:"]
res += self.compo.to_string(self.msg_count)
res += ["__Presence__:"]
res += self.pres.to_string(
self.msg_count,
self.total_msg,
show_top_channel=len(self.channels) > 1,
member_specific=self.member_specific,
chan_count=len(self.channels) if not self.full else None,
)
return res
+96
View File
@@ -0,0 +1,96 @@
from typing import Dict, List
from collections import defaultdict
import discord
# Custom libs
from logs import ChannelLogs, MessageLog
from .scanner import Scanner
from data_types import Counter
from utils import COMMON_HELP_ARGS, plural, precise, mention, alt_mention
class MentionedScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%mentioned: Rank specific user's mentions by their usage\n"
+ "arguments:\n"
+ "* @member/me - (required) one or more member\n"
+ "\n".join(COMMON_HELP_ARGS.split("\n")[1:])
+ "* <n> - top <n> mentions, default is 10\n"
+ "* all - include bots mentions\n"
+ "Example: %mentioned 10 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
has_digit_args=True,
valid_args=["all"],
help=MentionedScanner.help(),
intro_context="Mentioned by members",
)
async def init(self, message: discord.Message, *args: str) -> bool:
# get max emotes to view
self.top = 10
for arg in args:
if arg.isdigit():
self.top = int(arg)
if len(self.members) == 0:
await message.channel.send(
"You need to mention at least one member or use `me`", reference=message
)
return False
self.all_mentions = "all" in args
# Create mentions dict
self.mentions = defaultdict(Counter)
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return MentionedScanner.analyse_message(
message, self.mentions, self.raw_members, all_mentions=self.all_mentions
)
def get_results(self, intro: str) -> List[str]:
names = [name for name in self.mentions]
names.sort(key=lambda name: self.mentions[name].score(), reverse=True)
names = names[: self.top]
# Get the total of all emotes used
usage_count = Counter.total(self.mentions)
res = [intro]
res += [
self.mentions[name].to_string(
names.index(name),
name,
total_usage=usage_count,
)
for name in names
]
res += [
f"Total: {plural(usage_count,'time')} ({precise(usage_count/self.msg_count)}/msg)"
]
return res
@staticmethod
def analyse_message(
message: MessageLog,
mentions: Dict[str, Counter],
raw_members: List[int],
*,
all_mentions: bool,
) -> bool:
impacted = True
if not message.bot or all_mentions:
for member_id in message.mentions:
if member_id in raw_members:
count = message.content.count(
mention(member_id)
) + message.content.count(alt_mention(member_id))
mentions[mention(message.author)].update_use(
count, message.created_at
)
return impacted
+129
View File
@@ -0,0 +1,129 @@
from typing import Dict, List
from collections import defaultdict
import discord
# Custom libs
from logs import ChannelLogs, MessageLog
from .scanner import Scanner
from data_types import Counter
from utils import (
COMMON_HELP_ARGS,
plural,
precise,
mention,
alt_mention,
role_mention,
channel_mention,
)
class MentionsScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%mentions: Rank mentions by their usage\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* <n> - top <n> mentions, default is 10\n"
+ "* all - show role/channel/everyone/here mentions\n"
+ "* everyone - include bots mentions\n"
+ "Example: %mentions 10 #mychannel1 #mychannel2 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
has_digit_args=True,
valid_args=["all", "everyone"],
help=MentionsScanner.help(),
intro_context="Mention usage",
)
async def init(self, message: discord.Message, *args: str) -> bool:
# get max emotes to view
self.top = 10
for arg in args:
if arg.isdigit():
self.top = int(arg)
# check other args
self.all_mentions = "all" in args
# Create mentions dict
self.mentions = defaultdict(Counter)
self.all_messages = "everyone" in args
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return MentionsScanner.analyse_message(
message,
self.mentions,
self.raw_members,
all_mentions=self.all_mentions,
all_messages=self.all_messages,
)
def get_results(self, intro: str) -> List[str]:
names = [name for name in self.mentions]
names.sort(key=lambda name: self.mentions[name].score(), reverse=True)
names = names[: self.top]
# Get the total of all emotes used
usage_count = Counter.total(self.mentions)
res = [intro]
res += [
self.mentions[name].to_string(
names.index(name),
name,
total_usage=usage_count,
)
for name in names
]
res += [
f"Total: {plural(usage_count,'time')} ({precise(usage_count/self.msg_count)}/msg)"
]
return res
@staticmethod
def analyse_message(
message: MessageLog,
mentions: Dict[str, Counter],
raw_members: List[int],
*,
all_mentions: bool,
all_messages: bool,
) -> bool:
impacted = False
# If author is included in the selection (empty list is all)
if (
(not message.bot or all_messages)
and len(raw_members) == 0
or message.author in raw_members
):
impacted = True
for member_id in message.mentions:
name = mention(member_id)
count = message.content.count(name) + message.content.count(
alt_mention(member_id)
)
mentions[name].update_use(count, message.created_at)
if all_mentions:
for role_id in message.role_mentions:
name = role_mention(role_id)
mentions[name].update_use(
message.content.count(name), message.created_at
)
for channel_id in message.channel_mentions:
name = channel_mention(channel_id)
mentions[name].update_use(
message.content.count(name), message.created_at
)
if "@everyone" in message.content:
mentions["@\u200beveryone"].update_use(
message.content.count("@everyone"), message.created_at
)
if "@here" in message.content:
mentions["@\u200bhere"].update_use(
message.content.count("@here"), message.created_at
)
return impacted
+91
View File
@@ -0,0 +1,91 @@
from typing import Dict, List
from collections import defaultdict
import discord
# Custom libs
from logs import ChannelLogs, MessageLog
from .scanner import Scanner
from data_types import Counter
from utils import COMMON_HELP_ARGS, mention, channel_mention
class MessagesScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%msg: Rank users by their messages\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* <n> - top <n>, default is 10\n"
+ "* all/everyone - include bots\n"
+ "Example: %msg 10 #channel\n"
+ "```"
)
def __init__(self):
super().__init__(
has_digit_args=True,
valid_args=["all", "everyone"],
help=MessagesScanner.help(),
intro_context="Messages",
)
async def init(self, message: discord.Message, *args: str) -> bool:
# get max emotes to view
self.top = 10
for arg in args:
if arg.isdigit():
self.top = int(arg)
self.all_messages = "all" in args or "everyone" in args
# Create mentions dict
self.messages = defaultdict(Counter)
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return MessagesScanner.analyse_message(
channel.id,
message,
self.messages,
self.raw_members,
all_messages=self.all_messages,
)
def get_results(self, intro: str) -> List[str]:
names = [name for name in self.messages]
names.sort(key=lambda name: self.messages[name].score(), reverse=True)
names = names[: self.top]
usage_count = Counter.total(self.messages)
res = [intro]
res += [
self.messages[name].to_string(
names.index(name),
mention(name),
total_usage=usage_count,
counted="message",
transform=lambda id: f" in {channel_mention(id)}",
)
for name in names
]
return res
@staticmethod
def analyse_message(
channel_id: int,
message: MessageLog,
messages: Dict[str, Counter],
raw_members: List[int],
*,
all_messages: bool,
) -> bool:
impacted = False
if (
len(raw_members) == 0
and (not message.bot or all_messages)
or message.author in raw_members
):
impacted = True
messages[message.author].update_use(1, message.created_at, channel_id)
return impacted
+96
View File
@@ -0,0 +1,96 @@
from typing import List
import discord
# Custom libs
from .scanner import Scanner
from data_types import Presence
from logs import ChannelLogs, MessageLog
from utils import COMMON_HELP_ARGS
class PresenceScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%pres: Show presence statistics\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* all/everyone - include bots\n"
+ "Example: %pres #mychannel1 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
valid_args=["all", "everyone"],
help=PresenceScanner.help(),
intro_context="Presence",
)
async def init(self, message: discord.Message, *args: str) -> bool:
self.pres = Presence()
self.member_specific = len(self.members) > 0
self.all_messages = "all" in args or "everyone" in args
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return PresenceScanner.analyse_message(
channel,
message,
self.pres,
self.raw_members,
all_messages=self.all_messages,
)
def get_results(self, intro: str) -> List[str]:
res = [intro]
res += self.pres.to_string(
self.msg_count,
self.total_msg,
chan_count=len(self.channels) if not self.full else None,
show_top_channel=(len(self.channels) > 1),
member_specific=self.member_specific,
)
return res
@staticmethod
def analyse_message(
channel: ChannelLogs,
message: MessageLog,
pres: Presence,
raw_members: List[int],
*,
all_messages: bool,
) -> bool:
impacted = False
# If author is included in the selection (empty list is all)
if (
(not message.bot or all_messages)
and len(raw_members) == 0
or message.author in raw_members
):
impacted = True
pres.channel_usage[channel.id] += 1
for mention in message.mentions:
pres.mention_others[mention] += 1
pres.messages[message.author] += 1
pres.channel_total[channel.id] += 1
pres.mention_count[message.author] += len(message.mentions)
if len(raw_members) > 0:
for mention in message.mentions:
if mention in raw_members:
pres.mentions[message.author] += 1
for reaction in message.reactions:
for member_id in message.reactions[reaction]:
pres.used_reaction[member_id] += 1
if member_id in raw_members:
pres.reactions[reaction] += 1
else:
for reaction in message.reactions:
pres.reactions[reaction] += len(message.reactions[reaction])
for member_id in message.reactions[reaction]:
pres.used_reaction[member_id] += 1
return impacted
+82
View File
@@ -0,0 +1,82 @@
from typing import Dict, List
from collections import defaultdict
import discord
# Custom libs
from logs import ChannelLogs, MessageLog
from .scanner import Scanner
from data_types import Counter
from utils import COMMON_HELP_ARGS, mention, channel_mention
class ReactionsScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%react: Rank users by their reactions\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* <n> - top <n>, default is 10\n"
+ "Example: %react 10 #channel\n"
+ "```"
)
def __init__(self):
super().__init__(
has_digit_args=True,
help=ReactionsScanner.help(),
intro_context="Reactions",
)
async def init(self, message: discord.Message, *args: str) -> bool:
# get max emotes to view
self.top = 10
for arg in args:
if arg.isdigit():
self.top = int(arg)
# Create mentions dict
self.messages = defaultdict(Counter)
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return ReactionsScanner.analyse_message(
channel.id,
message,
self.messages,
self.raw_members,
)
def get_results(self, intro: str) -> List[str]:
names = [name for name in self.messages]
names.sort(key=lambda name: self.messages[name].score(), reverse=True)
names = names[: self.top]
usage_count = Counter.total(self.messages)
res = [intro]
res += [
self.messages[name].to_string(
names.index(name),
mention(name),
total_usage=usage_count,
counted="reaction",
transform=lambda id: f" in {channel_mention(id)}",
)
for name in names
]
return res
@staticmethod
def analyse_message(
channel_id: int,
message: MessageLog,
messages: Dict[str, Counter],
raw_members: List[int],
) -> bool:
impacted = True
for reaction in message.reactions:
for member_id in message.reactions[reaction]:
if len(raw_members) == 0 or member_id in raw_members:
messages[member_id].update_use(1, message.created_at, channel_id)
return impacted
+170
View File
@@ -0,0 +1,170 @@
from abc import ABC, abstractmethod
from typing import List
from datetime import datetime
import logging
import re
import discord
from utils import no_duplicate, get_intro, delta, deltas, mention, channel_mention
from logs import GuildLogs, ChannelLogs, MessageLog, ALREADY_RUNNING, CANCELLED
class Scanner(ABC):
def __init__(
self,
*,
has_digit_args: bool = False,
valid_args: List[str] = [],
help: str,
intro_context: str,
):
self.has_digit_args = has_digit_args
self.valid_args = valid_args
self.help = help
self.intro_context = intro_context
self.members = []
self.raw_members = []
self.full = False
self.channels = []
self.msg_count = 0
self.chan_count = 0
async def compute(
self, client: discord.client, message: discord.Message, *args: str
):
args = list(args)
guild = message.guild
logs = GuildLogs(guild)
# If "%cmd help" redirect to "%help cmd"
if "help" in args:
await client.bot.help(client, message, "help", args[0])
return
# check args validity
str_channel_mentions = [str(channel.id) for channel in message.channel_mentions]
str_mentions = [str(member.id) for member in message.mentions]
for i, arg in enumerate(args[1:]):
if re.match(r"^<@!?\d+>$", arg):
arg = arg[3:-1] if "!" in arg else arg[2:-1]
elif re.match(r"^<#!?\d+>$", arg):
arg = arg[3:-1] if "!" in arg else arg[2:-1]
if (
arg not in self.valid_args + ["me", "here", "fast", "fresh"]
and (not arg.isdigit() or not self.has_digit_args)
and arg not in str_channel_mentions
and arg not in str_mentions
):
await message.channel.send(
f"Unrecognized argument: `{arg}`", reference=message
)
return
# Get selected channels or all of them if no channel arguments
self.channels = no_duplicate(message.channel_mentions)
# transform the "here" arg
if "here" in args:
self.channels += [message.channel]
self.full = len(self.channels) == 0
if self.full:
self.channels = guild.text_channels
# Get selected members
self.members = no_duplicate(message.mentions)
self.raw_members = no_duplicate(message.raw_mentions)
# transform the "me" arg
if "me" in args:
self.members += [message.author]
self.raw_members += [message.author.id]
if not await self.init(message, *args):
return
# Start computing data
async with message.channel.typing():
progress = await message.channel.send(
"```Starting analysis...```",
reference=message,
allowed_mentions=discord.AllowedMentions.none(),
)
total_msg, total_chan = await logs.load(
progress, self.channels, fast="fast" in args, fresh="fresh" in args
)
if total_msg == CANCELLED:
await message.channel.send(
"Operation cancelled by user",
reference=message,
)
elif total_msg == ALREADY_RUNNING:
await message.channel.send(
"An analysis is already running on this server, please be patient.",
reference=message,
)
else:
self.msg_count = 0
self.total_msg = 0
self.chan_count = 0
t0 = datetime.now()
for channel in self.channels:
channel_logs = logs.channels[channel.id]
count = sum(
[
self.compute_message(channel_logs, message_log)
for message_log in channel_logs.messages
]
)
self.total_msg += len(channel_logs.messages)
self.msg_count += count
self.chan_count += 1 if count > 0 else 0
logging.info(f"scan {guild.id} > scanned in {delta(t0):,}ms")
await progress.edit(content="```Computing results...```")
# Display results
t0 = datetime.now()
results = self.get_results(
get_intro(
self.intro_context,
self.full,
self.channels,
self.members,
self.msg_count,
self.chan_count,
)
)
logging.info(f"scan {guild.id} > results in {delta(t0):,}ms")
response = ""
first = True
for r in results:
if len(response + "\n" + r) > 2000:
await message.channel.send(
response,
reference=message if first else None,
allowed_mentions=discord.AllowedMentions.none(),
)
first = False
response = ""
response += "\n" + r
if len(response) > 0:
await message.channel.send(
response,
reference=message if first else None,
allowed_mentions=discord.AllowedMentions.none(),
)
# Delete custom progress message
await progress.delete()
@abstractmethod
async def init(self, message: discord.Message, *args: str) -> bool:
pass
@abstractmethod
def compute_message(self, channel: ChannelLogs, message: MessageLog) -> bool:
pass
@abstractmethod
def get_results(self, intro: str):
pass
+1
View File
@@ -0,0 +1 @@
from .utils import *
+88
View File
@@ -0,0 +1,88 @@
import re
import json
import logging
from . import get_resource_path
EXTRA_EMOJI = {
"thumbup": "1f44d",
"thumbdown": "1f44e",
"timer": "23f2-fe0f",
"cowboy": "1f920",
"clown": "1f921",
"newspaper2": "1f5de-fe0f",
"french_bread": "1f956",
"nerd": "1f913",
"zipper_mouth": "1f910",
"salad": "1f957",
"rolling_eyes": "1f644",
"basketball_player": "26f9-fe0f-200d-2642-fe0f",
"thinking": "1f914",
"e_mail": "2709-fe0f",
"slight_frown": "1f641",
"skull_crossbones": "2620-fe0f",
"hand_splayed": "1f590-fe0f",
"speaking_head": "1f5e3-fe0f",
"cross": "271d-fe0f",
"crayon": "1f58d-fe0f",
"head_bandage": "1f915",
"rofl": "1f923",
"flag_white": "1f3f3-fe0f",
"slight_smile": "1f642",
"fork_knife_plate": "1f37d-fe0f",
"robot": "1f916",
"hugging": "1f917",
"biohazard": "2623-fe0f",
"notepad_spiral": "1f5d2-fe0f",
"lifter": "1f3cb-fe0f-200d-2642-fe0f",
"race_car": "1f3ce-fe0f",
"left_facing_fist": "1f91b",
"right_facing_fist": "1f91c",
"tools": "1f6e0-fe0f",
"umbrella2": "2602-fe0f",
"upside_down": "2b07-fe0f",
"first_place": "1f947",
"dagger": "1f5e1-fe0f",
"fox": "1f98a",
"menorah": "1f54e",
"desktop": "1f5a5-fe0f",
"motorcycle": "1f3cd-fe0f",
"levitate": "1f574-fe0f",
"cheese": "1f9c0",
"fingers_crossed": "1f91e",
"frowning2": "1f626",
"microphone2": "1f399-fe0f",
"flag_black": "1f3f4",
"chair": "1FA91",
"champagne_glass": "1F942",
"raised_hand": "270B",
"knife": "1F52A",
"postal_horn": "1F4EF",
"punch": "1F44A",
}
global_list = {}
unicode_list = []
regex = re.compile("(<a?:[\\w\\-\\~]+:\\d+>|:[\\w\\-\\~]+:)")
def load_emojis():
global global_list, unicode_list, regex
emoji_list = []
with open(get_resource_path("emoji.json"), mode="r") as f:
emoji_list = json.loads(f.readline().strip())
for emoji in EXTRA_EMOJI:
emoji_list += [{"short_name": emoji, "unified": EXTRA_EMOJI[emoji]}]
unicode_list_escaped = []
for emoji in emoji_list:
shortname = emoji["short_name"]
unified = emoji["unified"]
if unified is not None and shortname is not None:
unicode_escaped = "".join([f"\\U{c:0>8}" for c in unified.split("-")])
unicode = bytes(unicode_escaped, "ascii").decode("unicode-escape")
shortcode = shortname.replace("-", "_")
global_list[unicode] = f":{shortcode}:"
unicode_list += [unicode]
unicode_list_escaped += [unicode_escaped]
regex = re.compile(f"(<a?:\\w+:\\d+>|:\\w+:|{'|'.join(unicode_list_escaped)})")
logging.info(f"loaded {len(unicode_list)} emojis")
+220
View File
@@ -0,0 +1,220 @@
from typing import List, Dict, Union, Optional, Any
import os
import logging
import discord
import math
from datetime import datetime
# OTHER
COMMON_HELP_ARGS = (
""
+ "* @member/me - filter for one or more member\n"
+ "* #channel/here - filter for one or more channel\n"
+ "* fast - only read cache\n"
+ "* fresh - does not read cache (long)\n"
)
def delta(t0: datetime):
return round((datetime.now() - t0).total_seconds() * 1000)
def deltas(t0: datetime):
return (datetime.now() - t0).total_seconds()
# DISCORD API
def debug(message: discord.Message, txt: str):
logging.info(f"{message.guild} > #{message.channel}: {txt}")
async def code_message(message: discord.Message, content: str):
await message.edit(content=f"```\n{content}\n```")
def mention(member_id: int) -> str:
return f"<@{member_id}>"
def alt_mention(member_id: int) -> str:
return f"<@!{member_id}>"
def role_mention(role_id: int) -> str:
return f"<@&{role_id}>"
def channel_mention(channel_id: int) -> str:
return f"<#{channel_id}>"
class FakeMessage:
def __init__(self, id: int):
self.id = id
# FILE
def is_extension(filepath: str, ext_list: List[str]) -> bool:
filename, file_extension = os.path.splitext(filepath.lower())
return file_extension in ext_list
def get_resource_path(filename: str) -> str:
return os.path.realpath(
os.path.join(os.path.dirname(__file__), "..", "resources", filename)
)
# LISTS
def no_duplicate(seq: list) -> list:
"""
Remove any duplicates on a list
:param seq: original list
:type seq: list
:return: same list with no duplicates
:rtype: list
"""
return list(dict.fromkeys(seq))
# DICTS
def top_key(d: Dict[Union[str, int], int]) -> Union[str, int]:
return sorted(d, key=lambda k: d[k])[-1]
def val_sum(d: Dict[Any, int]) -> int:
return sum(d.values())
# MESSAGE FORMATTING
def aggregate(names: List[str]) -> str:
"""
Aggregate names with , and &
Example : "a, b, c & d"
"""
if len(names) == 0:
return ""
elif len(names) == 1:
return names[0]
else:
return ", ".join(names[:-1]) + " & " + names[-1]
def plural(count: int, word: str) -> str:
return f"{count:,} {word}{'s' if count != 1 else ''}"
def percent(p: float) -> str:
return f"{precise(100*p)}%"
def precise(p: float, *, precision: int = 2) -> str:
if p == 0:
return "0"
precision = abs(min(0, math.ceil(math.log10(p)) - precision))
s = "{:." + str(precision) + "f}"
return s.format(p)
# DATE FORMATTING
def str_date(date: datetime) -> str:
return date.strftime("%d %b. %Y") # 12 Jun. 2018
def str_datetime(date: datetime) -> str:
return date.strftime("%H:%M, %d %b. %Y") # 12:05, 12 Jun. 2018
def from_now(src: Optional[datetime]) -> str:
if src is None:
return "never"
delay = datetime.utcnow() - src
seconds = delay.seconds
minutes = seconds // 60
hours = minutes // 60
if delay.days < 1:
if hours < 1:
if minutes == 0:
return "now"
elif minutes == 1:
return "a minute ago"
else:
return f"{minutes} minutes ago"
elif hours == 1:
return "an hour ago"
else:
return f"{hours} hours ago"
elif delay.days == 1:
return "yesterday"
else:
return f"{delay.days:,} days ago"
# APP SPECIFIC
def get_intro(
subject: str,
full: bool,
channels: List[discord.TextChannel],
members: List[discord.Member],
nmm: int, # number of messages impacted
nc: int, # number of impacted channels
) -> str:
"""
Get the introduction sentence of the response
"""
# Show all data (members, channels) when it's less than 5 units
if len(members) == 0:
# Full scan of the server
if full:
return f"{subject} in this server ({nc} channels, {nmm:,} messages):"
elif len(channels) < 5:
return f"{aggregate([c.mention for c in channels])} {subject.lower()} in {nmm:,} messages:"
else:
return (
f"These {len(channels)} channels {subject.lower()} in {nmm:,} messages:"
)
elif len(members) < 5:
if full:
return f"{aggregate([m.mention for m in members])} {subject.lower()} in {nmm:,} messages:"
elif len(channels) < 5:
return (
f"{aggregate([m.mention for m in members])} on {aggregate([c.mention for c in channels])} "
f"{subject.lower()} in {nmm:,} messages:"
)
else:
return (
f"{aggregate([m.mention for m in members])} on these {len(channels)} channels "
f"{subject.lower()} in {nmm:,} messages:"
)
else:
if full:
return (
f"These {len(members)} members {subject.lower()} in {nmm:,} messages:"
)
elif len(channels) < 5:
return (
f"These {len(members)} members on {aggregate([c.mention for c in channels])} "
f"{subject.lower()} in {nmm:,} messages:"
)
else:
return (
f"These {len(members)} members on these {len(channels)} channels "
f"{subject.lower()} in {nmm:,} messages:"
)
-48
View File
@@ -1,48 +0,0 @@
# DISCORD API
def debug(message, txt):
"""
Print a log with the context of the current event
:param message: message that triggered the event
:type message: discord.Message
:param txt: text of the log
:type txt: str
"""
print(f"{message.guild} > #{message.channel}: {txt}")
# LISTS
def no_duplicate(seq):
"""
Remove any duplicates on a list
:param seq: original list
:type seq: list
:return: same list with no duplicates
:rtype: list
"""
return list(dict.fromkeys(seq))
# MESSAGE FORMATTING
def aggregate(names):
"""
Aggregate names with , and &
Example : "a, b, c & d"
:param names: list of names
:type names: list[str]
:return: correct aggregation
:rtype: str
"""
if len(names) == 0:
return ""
elif len(names) == 1:
return names[0]
else:
return ", ".join(names[:-1]) + " & " + names[-1]