16 Commits

Author SHA1 Message Date
Klemek 8cc0e1fe65 small fix (#26)
* updated README

* improved %words command

* new words scanner

* fix test

* concurrent fast analysis

* fast analysis if fresh

* better memory handling

* fix "stuck" bug

* updated README

* improved %words command

* small fix
2021-04-07 19:36:24 +02:00
Klemek b018650ce4 rebase
* updated README

* improved %words command

* new words scanner

* fix test

* concurrent fast analysis

* fast analysis if fresh

* better memory handling

* fix "stuck" bug

* updated README

* improved %words command
2021-04-07 19:31:02 +02:00
Klemek 7d9a07af9c improved %words command 2021-04-07 19:29:26 +02:00
Klemek 6dcf6500f8 updated README 2021-04-07 19:04:48 +02:00
Klemek 88e7a7fe94 Merge pull request #21 from Klemek/dev
v1.12
2021-04-07 19:02:03 +02:00
Klemek 40dc5d3c62 fix "stuck" bug 2021-04-07 18:58:35 +02:00
Klemek 77d512fca8 Merge pull request #20 from Klemek/f-fix-memory-leak
better memory handling
2021-04-07 18:43:16 +02:00
Klemek 562fd51c91 better memory handling 2021-04-07 18:41:07 +02:00
Klemek 45d56a3acb Merge pull request #18 from Klemek/f-better-fast
better fast
2021-04-07 15:11:36 +02:00
Klemek ac782b4ea4 fast analysis if fresh 2021-04-07 15:09:10 +02:00
Klemek 91ae6ed383 concurrent fast analysis 2021-04-07 14:55:54 +02:00
Klemek f97682f46a fix test 2021-04-07 14:38:02 +02:00
Klemek 85a9ac0414 Merge pull request #17 from Klemek/f-words
%words for a top list of words used
2021-04-07 14:36:17 +02:00
Klemek 653f91dda3 new words scanner 2021-04-07 14:35:23 +02:00
Klemek d2cc7afc88 Merge pull request #12 from Klemek/dev
remove non serializable from dicts
2021-04-06 23:39:32 +02:00
Klemek 728f593061 remove non serializable from dicts 2021-04-06 23:38:42 +02:00
9 changed files with 370 additions and 151 deletions
+1
View File
@@ -5,3 +5,4 @@ __pycache__
error_*
*.log
/logs/
.vscode
+12 -2
View File
@@ -43,7 +43,11 @@
* %react - rank users by their reactions
* arguments:
* <n> - top <n> messages, default is 10
* %cancel - cancel current analysis
* %words - rank words by their usage
* arguments:
* <n> - words containings <n> or more letters, default is 3
* <n2> - top <n2> words, default is 10
* %cancel - cancel current analysis (not launched with fast)
* Common arguments:
* @member/me: filter for one or more member
@@ -100,8 +104,14 @@ python3 src/main.py
## Changelog
* **v1.12**
* more scans: `%words`
* concurrent `fast` analysis
* assume `fast` if last analysis is fresh
* better memory handling
* bug fix
* **v1.11**
* more scans `%first`, `%rand`, `%last`
* more scans: `%first`, `%rand`, `%last`
* streak computing in `%pres`
* **v1.10**
* multithreading for queries
+10 -2
View File
@@ -28,7 +28,9 @@ class ChannelLogs:
if channel["last_message_id"] is not None
else None
)
self.messages = [MessageLog(message, self) for message in channel["messages"]]
self.messages = [
MessageLog(message, self) for message in channel["messages"]
]
def is_format(self):
return self.format == FORMAT
@@ -38,7 +40,12 @@ class ChannelLogs:
self.channel = channel
try:
if self.last_message_id is not None: # append
while self.last_message_id != channel.last_message_id:
tmp_message_id = None
while (
self.last_message_id != channel.last_message_id
and self.last_message_id != tmp_message_id
):
tmp_message_id = self.last_message_id
async for message in channel.history(
limit=CHUNK_SIZE,
after=FakeMessage(self.last_message_id),
@@ -78,5 +85,6 @@ class ChannelLogs:
def dict(self) -> dict:
channel = dict(self.__dict__)
channel.pop("channel", None)
channel.pop("guild", None)
channel["messages"] = [message.dict() for message in self.messages]
return channel
+87 -29
View File
@@ -4,6 +4,7 @@ import discord
import json
import gzip
from datetime import datetime
import time
import logging
import asyncio
import threading
@@ -22,6 +23,8 @@ current_analysis_lock = threading.Lock()
ALREADY_RUNNING = -100
CANCELLED = -200
MIN_MODIFICATION_TIME = 5 * 60
class Worker:
def __init__(self, channel_log: ChannelLogs, channel: discord.TextChannel):
@@ -53,12 +56,39 @@ class GuildLogs:
self.guild = guild
self.log_file = os.path.join(LOG_DIR, f"{guild.id}.logz")
self.channels = {}
self.locked = False
def __enter__(self):
return self
def __exit__(self, type, value, tb):
del self.channels
del self.guild
if self.locked:
self.unlock()
def dict(self) -> dict:
return {id: self.channels[id].dict() for id in self.channels}
def check_cancelled(self) -> bool:
return self.log_file not in current_analysis
return self.locked and self.log_file not in current_analysis
def lock(self) -> bool:
self.locked = True
current_analysis_lock.acquire()
if self.log_file in current_analysis:
current_analysis_lock.release()
return False
current_analysis.append(self.log_file)
current_analysis_lock.release()
return True
def unlock(self):
self.locked = False
current_analysis_lock.acquire()
if self.log_file in current_analysis:
current_analysis.remove(self.log_file)
current_analysis_lock.release()
async def load(
self,
@@ -68,19 +98,18 @@ class GuildLogs:
fast: bool,
fresh: bool,
) -> Tuple[int, int]:
current_analysis_lock.acquire()
if self.log_file in current_analysis:
current_analysis_lock.release()
self.locked = False
if not fast and not self.lock():
return ALREADY_RUNNING, 0
current_analysis.append(self.log_file)
current_analysis_lock.release()
t00 = datetime.now()
# read logs
if not os.path.exists(LOG_DIR):
os.mkdir(LOG_DIR)
last_time = None
if os.path.exists(self.log_file):
channels = {}
try:
last_time = os.path.getmtime(self.log_file)
gziped_data = None
await code_message(progress, "Reading saved history (1/4)...")
t0 = datetime.now()
@@ -92,6 +121,7 @@ class GuildLogs:
await code_message(progress, "Reading saved history (2/4)...")
t0 = datetime.now()
json_data = gzip.decompress(gziped_data)
del gziped_data
logging.info(
f"log {self.guild.id} > gzip decompress in {delta(t0):,}ms"
)
@@ -100,6 +130,7 @@ class GuildLogs:
await code_message(progress, "Reading saved history (3/4)...")
t0 = datetime.now()
channels = json.loads(json_data)
del json_data
logging.info(f"log {self.guild.id} > json parse in {delta(t0):,}ms")
if self.check_cancelled():
return CANCELLED, 0
@@ -122,31 +153,54 @@ class GuildLogs:
else:
fast = False
if len(target_channels) == 0:
target_channels = (
self.channels.values() if fast else self.guild.text_channels
)
elif fast:
# select already loaded channels only
target_channels_tmp = [
channel for channel in target_channels if channel.id in self.channels
]
if len(target_channels_tmp) == 0:
fast = False
else:
target_channels = target_channels_tmp
# assume fast if file is fresh
if (
not fast
and not fresh
and last_time is not None
and (time.time() - last_time) < MIN_MODIFICATION_TIME
):
invalid_target_channels = [
channel
for channel in target_channels
if channel.id not in self.channels
]
if len(invalid_target_channels) == 0:
fast = True
if self.locked:
self.unlock()
total_msg = 0
total_chan = 0
if fast:
if len(target_channels) == 0:
total_msg = sum(
[len(channel.messages) for channel in self.channels.values()]
)
total_chan = len(self.channels)
else:
target_channels_id = [channel.id for channel in target_channels]
total_msg = sum(
[
len(channel.messages)
for channel in self.channels.values()
if channel.id in target_channels_id
]
)
total_chan = len(target_channels)
target_channels_id = [channel.id for channel in target_channels]
total_msg = sum(
[
len(channel.messages)
for channel in self.channels.values()
if channel.id in target_channels_id
]
)
total_chan = len(target_channels)
else:
if not self.locked and not self.lock():
return ALREADY_RUNNING, 0
# load channels
t0 = datetime.now()
if len(target_channels) == 0:
target_channels = (
self.guild.text_channels if not fast else self.channels.keys()
)
loading_new = 0
queried_msg = 0
total_chan = 0
@@ -225,6 +279,7 @@ class GuildLogs:
)
t0 = datetime.now()
gziped_data = gzip.compress(json_data)
del json_data
logging.info(
f"log {self.guild.id} > gzip in {delta(t0):,}ms -> {real_total_msg / deltas(t0):,.3f} m/s"
)
@@ -237,6 +292,7 @@ class GuildLogs:
t0 = datetime.now()
with open(self.log_file, mode="wb") as f:
f.write(gziped_data)
del gziped_data
logging.info(
f"log {self.guild.id} > saved in {delta(t0):,}ms -> {real_total_msg / deltas(t0):,.3f} m/s"
)
@@ -247,9 +303,10 @@ class GuildLogs:
f"Analysing...\n{total_msg:,} messages in {total_chan:,} channels",
)
logging.info(f"log {self.guild.id} > TOTAL TIME: {delta(t00):,}ms")
current_analysis_lock.acquire()
current_analysis.remove(self.log_file)
current_analysis_lock.release()
if self.locked:
current_analysis_lock.acquire()
current_analysis.remove(self.log_file)
current_analysis_lock.release()
return total_msg, total_chan
@staticmethod
@@ -262,5 +319,6 @@ class GuildLogs:
else:
current_analysis_lock.release()
await message.channel.send(
f"No analysis are currently running on this server", reference=message
f"No cancellable analysis are currently running on this server",
reference=message,
)
+1
View File
@@ -79,6 +79,7 @@ class MessageLog:
def dict(self) -> dict:
message = dict(self.__dict__)
message.pop("channel", None)
message["created_at"] = self.created_at.isoformat()
message["edited_at"] = (
self.edited_at.isoformat() if self.edited_at is not None else None
+10 -3
View File
@@ -21,6 +21,7 @@ from scanners import (
FirstScanner,
RandomScanner,
LastScanner,
WordsScanner,
)
from logs import GuildLogs
@@ -32,7 +33,7 @@ emojis.load_emojis()
bot = Bot(
"Discord Analyst",
"1.11",
"1.12",
alias="%",
)
@@ -41,8 +42,8 @@ bot.log_calls = True
bot.register_command(
"(cancel|stop)",
GuildLogs.cancel,
"cancel: stop current analysis",
"```\n" + "%cancel: Stop current analysis\n" + "```",
"cancel: stop current analysis (not launched with fast)",
"```\n" + "%cancel: Stop current analysis (not launched with fast)\n" + "```",
)
bot.register_command(
"last",
@@ -62,6 +63,12 @@ bot.register_command(
"first: read first message",
FirstScanner.help(),
)
bot.register_command(
"words",
lambda *args: WordsScanner().compute(*args),
"words: rank words by their usage",
WordsScanner.help(),
)
bot.register_command(
"mentioned",
lambda *args: MentionedScanner().compute(*args),
+2 -1
View File
@@ -10,4 +10,5 @@ from .channels_scanner import ChannelsScanner
from .reactions_scanner import ReactionsScanner
from .first_scanner import FirstScanner
from .last_scanner import LastScanner
from .random_scanner import RandomScanner
from .random_scanner import RandomScanner
from .words_scanner import WordsScanner
+115 -114
View File
@@ -36,133 +36,134 @@ class Scanner(ABC):
):
args = list(args)
guild = message.guild
logs = GuildLogs(guild)
# If "%cmd help" redirect to "%help cmd"
if "help" in args:
await client.bot.help(client, message, "help", args[0])
return
# check args validity
str_channel_mentions = [str(channel.id) for channel in message.channel_mentions]
str_mentions = [str(member.id) for member in message.mentions]
for i, arg in enumerate(args[1:]):
if re.match(r"^<@!?\d+>$", arg):
arg = arg[3:-1] if "!" in arg else arg[2:-1]
elif re.match(r"^<#!?\d+>$", arg):
arg = arg[3:-1] if "!" in arg else arg[2:-1]
if (
arg not in self.valid_args + ["me", "here", "fast", "fresh"]
and (not arg.isdigit() or not self.has_digit_args)
and arg not in str_channel_mentions
and arg not in str_mentions
):
await message.channel.send(
f"Unrecognized argument: `{arg}`", reference=message
)
with GuildLogs(guild) as logs:
# If "%cmd help" redirect to "%help cmd"
if "help" in args:
await client.bot.help(client, message, "help", args[0])
return
# Get selected channels or all of them if no channel arguments
self.channels = no_duplicate(message.channel_mentions)
# transform the "here" arg
if "here" in args:
self.channels += [message.channel]
self.full = len(self.channels) == 0
if self.full:
self.channels = guild.text_channels
# Get selected members
self.members = no_duplicate(message.mentions)
self.raw_members = no_duplicate(message.raw_mentions)
# transform the "me" arg
if "me" in args:
self.members += [message.author]
self.raw_members += [message.author.id]
if not await self.init(message, *args):
return
# Start computing data
async with message.channel.typing():
progress = await message.channel.send(
"```Starting analysis...```",
reference=message,
allowed_mentions=discord.AllowedMentions.none(),
)
total_msg, total_chan = await logs.load(
progress, self.channels, fast="fast" in args, fresh="fresh" in args
)
if total_msg == CANCELLED:
await message.channel.send(
"Operation cancelled by user",
reference=message,
)
elif total_msg == ALREADY_RUNNING:
await message.channel.send(
"An analysis is already running on this server, please be patient.",
reference=message,
)
else:
self.msg_count = 0
self.total_msg = 0
self.chan_count = 0
t0 = datetime.now()
for channel in self.channels:
if channel.id in logs.channels:
channel_logs = logs.channels[channel.id]
count = sum(
[
self.compute_message(channel_logs, message_log)
for message_log in channel_logs.messages
]
)
self.total_msg += len(channel_logs.messages)
self.msg_count += count
self.chan_count += 1 if count > 0 else 0
logging.info(f"scan {guild.id} > scanned in {delta(t0):,}ms")
if self.total_msg == 0:
# check args validity
str_channel_mentions = [
str(channel.id) for channel in message.channel_mentions
]
str_mentions = [str(member.id) for member in message.mentions]
for i, arg in enumerate(args[1:]):
if re.match(r"^<@!?\d+>$", arg):
arg = arg[3:-1] if "!" in arg else arg[2:-1]
elif re.match(r"^<#!?\d+>$", arg):
arg = arg[3:-1] if "!" in arg else arg[2:-1]
if (
arg not in self.valid_args + ["me", "here", "fast", "fresh"]
and (not arg.isdigit() or not self.has_digit_args)
and arg not in str_channel_mentions
and arg not in str_mentions
):
await message.channel.send(
"There are no messages found matching the filters",
f"Unrecognized argument: `{arg}`", reference=message
)
return
# Get selected channels or all of them if no channel arguments
self.channels = no_duplicate(message.channel_mentions)
# transform the "here" arg
if "here" in args:
self.channels += [message.channel]
self.full = len(self.channels) == 0
if self.full:
self.channels = guild.text_channels
# Get selected members
self.members = no_duplicate(message.mentions)
self.raw_members = no_duplicate(message.raw_mentions)
# transform the "me" arg
if "me" in args:
self.members += [message.author]
self.raw_members += [message.author.id]
if not await self.init(message, *args):
return
# Start computing data
async with message.channel.typing():
progress = await message.channel.send(
"```Starting analysis...```",
reference=message,
allowed_mentions=discord.AllowedMentions.none(),
)
total_msg, total_chan = await logs.load(
progress, self.channels, fast="fast" in args, fresh="fresh" in args
)
if total_msg == CANCELLED:
await message.channel.send(
"Operation cancelled by user",
reference=message,
)
elif total_msg == ALREADY_RUNNING:
await message.channel.send(
"An analysis is already running on this server, please be patient.",
reference=message,
)
else:
await progress.edit(content="```Computing results...```")
# Display results
self.msg_count = 0
self.total_msg = 0
self.chan_count = 0
t0 = datetime.now()
results = self.get_results(
get_intro(
self.intro_context,
self.full,
self.channels,
self.members,
self.msg_count,
self.chan_count,
for channel in self.channels:
if channel.id in logs.channels:
channel_logs = logs.channels[channel.id]
count = sum(
[
self.compute_message(channel_logs, message_log)
for message_log in channel_logs.messages
]
)
self.total_msg += len(channel_logs.messages)
self.msg_count += count
self.chan_count += 1 if count > 0 else 0
logging.info(f"scan {guild.id} > scanned in {delta(t0):,}ms")
if self.total_msg == 0:
await message.channel.send(
"There are no messages found matching the filters",
reference=message,
)
)
logging.info(f"scan {guild.id} > results in {delta(t0):,}ms")
response = ""
first = True
for r in results:
if len(response + "\n" + r) > 2000:
else:
await progress.edit(content="```Computing results...```")
# Display results
t0 = datetime.now()
results = self.get_results(
get_intro(
self.intro_context,
self.full,
self.channels,
self.members,
self.msg_count,
self.chan_count,
)
)
logging.info(f"scan {guild.id} > results in {delta(t0):,}ms")
response = ""
first = True
for r in results:
if len(response + "\n" + r) > 2000:
await message.channel.send(
response,
reference=message if first else None,
allowed_mentions=discord.AllowedMentions.none(),
)
first = False
response = ""
response += "\n" + r
if len(response) > 0:
await message.channel.send(
response,
reference=message if first else None,
allowed_mentions=discord.AllowedMentions.none(),
)
first = False
response = ""
response += "\n" + r
if len(response) > 0:
await message.channel.send(
response,
reference=message if first else None,
allowed_mentions=discord.AllowedMentions.none(),
)
# Delete custom progress message
await progress.delete()
# Delete custom progress message
await progress.delete()
@abstractmethod
async def init(self, message: discord.Message, *args: str) -> bool:
+132
View File
@@ -0,0 +1,132 @@
from typing import Dict, List
from collections import defaultdict
import discord
import re
# Custom libs
from logs import ChannelLogs, MessageLog
from .scanner import Scanner
from data_types import Counter
from utils import (
COMMON_HELP_ARGS,
plural,
precise,
)
class WordsScanner(Scanner):
@staticmethod
def help() -> str:
return (
"```\n"
+ "%words: Rank words by their usage\n"
+ "arguments:\n"
+ COMMON_HELP_ARGS
+ "* <n> - words containings <n> or more letters, default is 3\n"
+ "* <n2> - top <n2> words, default is 10\n"
+ "* everyone - include bots\n"
+ "Example: %words 5 10 #mychannel1 #mychannel2 @user\n"
+ "```"
)
def __init__(self):
super().__init__(
has_digit_args=True,
valid_args=["all", "everyone"],
help=WordsScanner.help(),
intro_context="Words ({}+ letters)",
)
async def init(self, message: discord.Message, *args: str) -> bool:
self.letters = None
self.top = None
for arg in args:
if arg.isdigit():
if self.letters is None:
self.letters = int(arg)
elif self.top is None:
self.top = int(arg)
if self.letters is None:
self.letters = 3
if self.top is None:
self.top = 10
self.words = defaultdict(Counter)
self.all_messages = "all" in args or "everyone" in args
return True
def compute_message(self, channel: ChannelLogs, message: MessageLog):
return WordsScanner.analyse_message(
message,
self.words,
self.raw_members,
all_messages=self.all_messages,
letters_threshold=self.letters,
)
def get_results(self, intro: str) -> List[str]:
words = [word for word in self.words]
words.sort(key=lambda word: self.words[word].score(), reverse=True)
words = words[: self.top]
# Get the total of all emotes used
usage_count = Counter.total(self.words)
print(len(self.words))
res = [intro.format(self.letters)]
res += [
self.words[word].to_string(
words.index(word),
f"`{word}`",
total_usage=usage_count,
)
for word in words
]
res += [
f"Total: {plural(usage_count,'time')} ({precise(usage_count/self.msg_count)}/msg)"
]
return res
special_cases = ["'s", "s"]
@staticmethod
def analyse_message(
message: MessageLog,
words: Dict[str, Counter],
raw_members: List[int],
*,
all_messages: bool,
letters_threshold: int,
) -> bool:
impacted = False
# If author is included in the selection (empty list is all)
if (
(not message.bot or all_messages)
and len(raw_members) == 0
or message.author in raw_members
):
impacted = True
content = " ".join(
[
block
for block in message.content.split()
if not re.match(r"^\w+:\/\/", block)
]
)
for word in re.split("[^\w\-':]", content):
m = re.match(
r"(?!^:\w+:$)^[^\w]*((?![\d_])\w.*(?![\d_])\w)[^\w]*$", word
)
if m:
word = m[1].lower()
if len(word) >= letters_threshold:
for case in WordsScanner.special_cases:
if word.endswith(case) and word[: -len(case)] in words:
word = word[: -len(case)]
break
if word + case in words:
words[word] = words[word + case]
del words[word + case]
break
words[word].update_use(
message.content.count(word), message.created_at
)
return impacted