From 91ae6ed383ea39b7edab548a48546be4c5a3ed74 Mon Sep 17 00:00:00 2001 From: Klemek Date: Wed, 7 Apr 2021 14:55:54 +0200 Subject: [PATCH 1/2] concurrent fast analysis --- README.md | 2 +- src/logs/guild_logs.py | 76 ++++++++++++++++++++++++++---------------- src/main.py | 4 +-- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 4b9fe2d..afb1449 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ * %words - rank words by their usage * arguments: * - top words, default is 10 -* %cancel - cancel current analysis +* %cancel - cancel current analysis (not launched with fast) * Common arguments: * @member/me: filter for one or more member diff --git a/src/logs/guild_logs.py b/src/logs/guild_logs.py index 7f3cecb..abe398e 100644 --- a/src/logs/guild_logs.py +++ b/src/logs/guild_logs.py @@ -53,12 +53,13 @@ class GuildLogs: self.guild = guild self.log_file = os.path.join(LOG_DIR, f"{guild.id}.logz") self.channels = {} + self.locked = False def dict(self) -> dict: return {id: self.channels[id].dict() for id in self.channels} def check_cancelled(self) -> bool: - return self.log_file not in current_analysis + return self.locked and self.log_file not in current_analysis async def load( self, @@ -68,12 +69,15 @@ class GuildLogs: fast: bool, fresh: bool, ) -> Tuple[int, int]: - current_analysis_lock.acquire() - if self.log_file in current_analysis: + self.locked = False + if not fast: + self.locked = True + current_analysis_lock.acquire() + if self.log_file in current_analysis: + current_analysis_lock.release() + return ALREADY_RUNNING, 0 + current_analysis.append(self.log_file) current_analysis_lock.release() - return ALREADY_RUNNING, 0 - current_analysis.append(self.log_file) - current_analysis_lock.release() t00 = datetime.now() # read logs if not os.path.exists(LOG_DIR): @@ -122,31 +126,43 @@ class GuildLogs: else: fast = False + if len(target_channels) == 0: + target_channels = ( + self.channels.values() if fast else self.guild.text_channels + ) + elif fast: + # select already loaded channels only + target_channels_tmp = [ + channel for channel in target_channels if channel.id in self.channels + ] + if len(target_channels_tmp) == 0: + fast = False + else: + target_channels = target_channels_tmp + total_msg = 0 total_chan = 0 if fast: - if len(target_channels) == 0: - total_msg = sum( - [len(channel.messages) for channel in self.channels.values()] - ) - total_chan = len(self.channels) - else: - target_channels_id = [channel.id for channel in target_channels] - total_msg = sum( - [ - len(channel.messages) - for channel in self.channels.values() - if channel.id in target_channels_id - ] - ) - total_chan = len(target_channels) + target_channels_id = [channel.id for channel in target_channels] + total_msg = sum( + [ + len(channel.messages) + for channel in self.channels.values() + if channel.id in target_channels_id + ] + ) + total_chan = len(target_channels) else: + if not self.locked: + self.locked = True + current_analysis_lock.acquire() + if self.log_file in current_analysis: + current_analysis_lock.release() + return ALREADY_RUNNING, 0 + current_analysis.append(self.log_file) + current_analysis_lock.release() # load channels t0 = datetime.now() - if len(target_channels) == 0: - target_channels = ( - self.guild.text_channels if not fast else self.channels.keys() - ) loading_new = 0 queried_msg = 0 total_chan = 0 @@ -247,9 +263,10 @@ class GuildLogs: f"Analysing...\n{total_msg:,} messages in {total_chan:,} channels", ) logging.info(f"log {self.guild.id} > TOTAL TIME: {delta(t00):,}ms") - current_analysis_lock.acquire() - current_analysis.remove(self.log_file) - current_analysis_lock.release() + if self.locked: + current_analysis_lock.acquire() + current_analysis.remove(self.log_file) + current_analysis_lock.release() return total_msg, total_chan @staticmethod @@ -262,5 +279,6 @@ class GuildLogs: else: current_analysis_lock.release() await message.channel.send( - f"No analysis are currently running on this server", reference=message + f"No cancellable analysis are currently running on this server", + reference=message, ) diff --git a/src/main.py b/src/main.py index 027a6fd..565e92d 100644 --- a/src/main.py +++ b/src/main.py @@ -42,8 +42,8 @@ bot.log_calls = True bot.register_command( "(cancel|stop)", GuildLogs.cancel, - "cancel: stop current analysis", - "```\n" + "%cancel: Stop current analysis\n" + "```", + "cancel: stop current analysis (not launched with fast)", + "```\n" + "%cancel: Stop current analysis (not launched with fast)\n" + "```", ) bot.register_command( "last", From ac782b4ea439cf9d067537dfa1a5b10b001d6702 Mon Sep 17 00:00:00 2001 From: Klemek Date: Wed, 7 Apr 2021 15:09:10 +0200 Subject: [PATCH 2/2] fast analysis if fresh --- README.md | 2 ++ src/logs/guild_logs.py | 58 ++++++++++++++++++++++++++++++------------ 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index afb1449..ec9ff4f 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,8 @@ python3 src/main.py * **v1.12** * more scans: `%words` + * concurrent `fast` analysis + * assume `fast` if last analysis is fresh * **v1.11** * more scans: `%first`, `%rand`, `%last` * streak computing in `%pres` diff --git a/src/logs/guild_logs.py b/src/logs/guild_logs.py index abe398e..afd8de0 100644 --- a/src/logs/guild_logs.py +++ b/src/logs/guild_logs.py @@ -4,6 +4,7 @@ import discord import json import gzip from datetime import datetime +import time import logging import asyncio import threading @@ -22,6 +23,8 @@ current_analysis_lock = threading.Lock() ALREADY_RUNNING = -100 CANCELLED = -200 +MIN_MODIFICATION_TIME = 5 * 60 + class Worker: def __init__(self, channel_log: ChannelLogs, channel: discord.TextChannel): @@ -61,6 +64,22 @@ class GuildLogs: def check_cancelled(self) -> bool: return self.locked and self.log_file not in current_analysis + def lock(self) -> bool: + self.locked = True + current_analysis_lock.acquire() + if self.log_file in current_analysis: + current_analysis_lock.release() + return False + current_analysis.append(self.log_file) + current_analysis_lock.release() + return True + + def unlock(self): + self.locked = False + current_analysis_lock.acquire() + current_analysis.remove(self.log_file) + current_analysis_lock.release() + async def load( self, progress: discord.Message, @@ -70,21 +89,17 @@ class GuildLogs: fresh: bool, ) -> Tuple[int, int]: self.locked = False - if not fast: - self.locked = True - current_analysis_lock.acquire() - if self.log_file in current_analysis: - current_analysis_lock.release() - return ALREADY_RUNNING, 0 - current_analysis.append(self.log_file) - current_analysis_lock.release() + if not fast and not self.lock(): + return ALREADY_RUNNING, 0 t00 = datetime.now() # read logs if not os.path.exists(LOG_DIR): os.mkdir(LOG_DIR) + last_time = None if os.path.exists(self.log_file): channels = {} try: + last_time = os.path.getmtime(self.log_file) gziped_data = None await code_message(progress, "Reading saved history (1/4)...") t0 = datetime.now() @@ -140,6 +155,23 @@ class GuildLogs: else: target_channels = target_channels_tmp + # assume fast if file is fresh + if ( + not fast + and not fresh + and last_time is not None + and (time.time() - last_time) < MIN_MODIFICATION_TIME + ): + invalid_target_channels = [ + channel + for channel in target_channels + if channel.id not in self.channels + ] + if len(invalid_target_channels) == 0: + fast = True + if self.locked: + self.unlock() + total_msg = 0 total_chan = 0 if fast: @@ -153,14 +185,8 @@ class GuildLogs: ) total_chan = len(target_channels) else: - if not self.locked: - self.locked = True - current_analysis_lock.acquire() - if self.log_file in current_analysis: - current_analysis_lock.release() - return ALREADY_RUNNING, 0 - current_analysis.append(self.log_file) - current_analysis_lock.release() + if not self.locked and not self.lock(): + return ALREADY_RUNNING, 0 # load channels t0 = datetime.now() loading_new = 0