From 66b94d2b9d41528e8a3d795f139266c35534a486 Mon Sep 17 00:00:00 2001 From: Klemek Date: Tue, 16 Mar 2021 15:38:16 +0100 Subject: [PATCH] multithread --- src/logs/guild_logs.py | 85 ++++++++++++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/src/logs/guild_logs.py b/src/logs/guild_logs.py index 135e3e5..5250a62 100644 --- a/src/logs/guild_logs.py +++ b/src/logs/guild_logs.py @@ -5,6 +5,7 @@ import json import gzip from datetime import datetime import logging +import asyncio from . import ChannelLogs @@ -20,6 +21,30 @@ ALREADY_RUNNING = -100 CANCELLED = -200 +class Worker: + def __init__(self, channel_log: ChannelLogs, channel: discord.TextChannel): + self.channel_log = channel_log + self.channel = channel + self.start_msg = len(channel_log.messages) + self.total_msg = self.start_msg + self.queried_msg = 0 + self.done = False + self.cancelled = False + self.loop = asyncio.get_event_loop() + + def start(self): + asyncio.run_coroutine_threadsafe(self.process(), self.loop) + + async def process(self): + async for count, done in self.channel_log.load(self.channel): + if count > 0: + self.queried_msg = count - self.start_msg + self.total_msg = count + self.done = done + if self.cancelled: + return + + class GuildLogs: def __init__(self, guild: discord.Guild): self.guild = guild @@ -121,37 +146,43 @@ class GuildLogs: max_chan = len(target_channels) if self.check_cancelled(): return CANCELLED, 0 - await code_message( - progress, - f"Reading new history...\n0 messages in 0/{max_chan:,} channels\n(this might take a while)", - ) + workers = [] for channel in target_channels: if channel.id not in self.channels or fresh: loading_new += 1 self.channels[channel.id] = ChannelLogs(channel) - start_msg = len(self.channels[channel.id].messages) - count = 0 - async for count, done in self.channels[channel.id].load(channel): - if count > 0: - tmp_queried_msg = queried_msg + count - start_msg - tmp_msg = total_msg + count - warning_msg = "(this might take a while)" - if len(target_channels) > 5 and loading_new > 5: - warning_msg = "(most channels are new, this might take a looong while)" - elif loading_new > 0: - warning_msg = ( - "(some channels are new, this might take a long while)" - ) - if self.check_cancelled(): - return CANCELLED, 0 - await code_message( - progress, - f"Reading new history...\n{tmp_msg:,} messages in {total_chan + 1:,}/{max_chan:,} channels ({round(tmp_queried_msg/deltas(t0)):,}m/s)\n{warning_msg}", - ) - if done: - total_chan += 1 - total_msg += len(self.channels[channel.id].messages) - queried_msg += count - start_msg + workers += [Worker(self.channels[channel.id], channel)] + warning_msg = "(this might take a while)" + if len(target_channels) > 5 and loading_new > 5: + warning_msg = "(most channels are new, this might take a looong while)" + elif loading_new > 0: + warning_msg = "(some channels are new, this might take a long while)" + await code_message( + progress, + f"Reading new history...\n0 messages in 0/{max_chan:,} channels\n{warning_msg}", + ) + for worker in workers: + worker.start() + done = False + while not done: + if self.check_cancelled(): + for worker in workers: + worker.cancelled = True + return CANCELLED, 0 + + await asyncio.sleep(0.5) + + total_chan = sum([worker.done for worker in workers]) + queried_msg = sum([worker.queried_msg for worker in workers]) + total_msg = sum([worker.total_msg for worker in workers]) + + if total_chan == max_chan: + done = True + + await code_message( + progress, + f"Reading new history...\n{total_msg:,} messages in {total_chan:,}/{max_chan:,} channels ({round(queried_msg/deltas(t0)):,}m/s)\n{warning_msg}", + ) logging.info( f"log {self.guild.id} > queried in {delta(t0):,}ms -> {queried_msg / deltas(t0):,.3f} m/s" )