multithread
This commit is contained in:
+58
-27
@@ -5,6 +5,7 @@ import json
|
|||||||
import gzip
|
import gzip
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
|
||||||
from . import ChannelLogs
|
from . import ChannelLogs
|
||||||
@@ -20,6 +21,30 @@ ALREADY_RUNNING = -100
|
|||||||
CANCELLED = -200
|
CANCELLED = -200
|
||||||
|
|
||||||
|
|
||||||
|
class Worker:
|
||||||
|
def __init__(self, channel_log: ChannelLogs, channel: discord.TextChannel):
|
||||||
|
self.channel_log = channel_log
|
||||||
|
self.channel = channel
|
||||||
|
self.start_msg = len(channel_log.messages)
|
||||||
|
self.total_msg = self.start_msg
|
||||||
|
self.queried_msg = 0
|
||||||
|
self.done = False
|
||||||
|
self.cancelled = False
|
||||||
|
self.loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
asyncio.run_coroutine_threadsafe(self.process(), self.loop)
|
||||||
|
|
||||||
|
async def process(self):
|
||||||
|
async for count, done in self.channel_log.load(self.channel):
|
||||||
|
if count > 0:
|
||||||
|
self.queried_msg = count - self.start_msg
|
||||||
|
self.total_msg = count
|
||||||
|
self.done = done
|
||||||
|
if self.cancelled:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
class GuildLogs:
|
class GuildLogs:
|
||||||
def __init__(self, guild: discord.Guild):
|
def __init__(self, guild: discord.Guild):
|
||||||
self.guild = guild
|
self.guild = guild
|
||||||
@@ -121,37 +146,43 @@ class GuildLogs:
|
|||||||
max_chan = len(target_channels)
|
max_chan = len(target_channels)
|
||||||
if self.check_cancelled():
|
if self.check_cancelled():
|
||||||
return CANCELLED, 0
|
return CANCELLED, 0
|
||||||
await code_message(
|
workers = []
|
||||||
progress,
|
|
||||||
f"Reading new history...\n0 messages in 0/{max_chan:,} channels\n(this might take a while)",
|
|
||||||
)
|
|
||||||
for channel in target_channels:
|
for channel in target_channels:
|
||||||
if channel.id not in self.channels or fresh:
|
if channel.id not in self.channels or fresh:
|
||||||
loading_new += 1
|
loading_new += 1
|
||||||
self.channels[channel.id] = ChannelLogs(channel)
|
self.channels[channel.id] = ChannelLogs(channel)
|
||||||
start_msg = len(self.channels[channel.id].messages)
|
workers += [Worker(self.channels[channel.id], channel)]
|
||||||
count = 0
|
warning_msg = "(this might take a while)"
|
||||||
async for count, done in self.channels[channel.id].load(channel):
|
if len(target_channels) > 5 and loading_new > 5:
|
||||||
if count > 0:
|
warning_msg = "(most channels are new, this might take a looong while)"
|
||||||
tmp_queried_msg = queried_msg + count - start_msg
|
elif loading_new > 0:
|
||||||
tmp_msg = total_msg + count
|
warning_msg = "(some channels are new, this might take a long while)"
|
||||||
warning_msg = "(this might take a while)"
|
await code_message(
|
||||||
if len(target_channels) > 5 and loading_new > 5:
|
progress,
|
||||||
warning_msg = "(most channels are new, this might take a looong while)"
|
f"Reading new history...\n0 messages in 0/{max_chan:,} channels\n{warning_msg}",
|
||||||
elif loading_new > 0:
|
)
|
||||||
warning_msg = (
|
for worker in workers:
|
||||||
"(some channels are new, this might take a long while)"
|
worker.start()
|
||||||
)
|
done = False
|
||||||
if self.check_cancelled():
|
while not done:
|
||||||
return CANCELLED, 0
|
if self.check_cancelled():
|
||||||
await code_message(
|
for worker in workers:
|
||||||
progress,
|
worker.cancelled = True
|
||||||
f"Reading new history...\n{tmp_msg:,} messages in {total_chan + 1:,}/{max_chan:,} channels ({round(tmp_queried_msg/deltas(t0)):,}m/s)\n{warning_msg}",
|
return CANCELLED, 0
|
||||||
)
|
|
||||||
if done:
|
await asyncio.sleep(0.5)
|
||||||
total_chan += 1
|
|
||||||
total_msg += len(self.channels[channel.id].messages)
|
total_chan = sum([worker.done for worker in workers])
|
||||||
queried_msg += count - start_msg
|
queried_msg = sum([worker.queried_msg for worker in workers])
|
||||||
|
total_msg = sum([worker.total_msg for worker in workers])
|
||||||
|
|
||||||
|
if total_chan == max_chan:
|
||||||
|
done = True
|
||||||
|
|
||||||
|
await code_message(
|
||||||
|
progress,
|
||||||
|
f"Reading new history...\n{total_msg:,} messages in {total_chan:,}/{max_chan:,} channels ({round(queried_msg/deltas(t0)):,}m/s)\n{warning_msg}",
|
||||||
|
)
|
||||||
logging.info(
|
logging.info(
|
||||||
f"log {self.guild.id} > queried in {delta(t0):,}ms -> {queried_msg / deltas(t0):,.3f} m/s"
|
f"log {self.guild.id} > queried in {delta(t0):,}ms -> {queried_msg / deltas(t0):,.3f} m/s"
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user