@@ -18,6 +18,9 @@
|
||||
* %freq - frequency analysis
|
||||
* %compo - composition analysis
|
||||
* %pres - presence analysis
|
||||
* %first - read first message
|
||||
* %rand - read a random message
|
||||
* %last - read last message
|
||||
* %emojis - rank emotes by their usage
|
||||
* arguments:
|
||||
* <n> - top <n> emojis, default is 20
|
||||
@@ -98,6 +101,7 @@ python3 src/main.py
|
||||
## Changelog
|
||||
|
||||
* **v1.11**
|
||||
* more scans `%first`, `%rand`, `%last`
|
||||
* streak computing in `%pres`
|
||||
* **v1.10**
|
||||
* multithreading for queries
|
||||
|
||||
@@ -3,3 +3,4 @@ from .frequency import Frequency
|
||||
from .composition import Composition
|
||||
from .presence import Presence
|
||||
from .counter import Counter
|
||||
from .history import History
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
from typing import List
|
||||
import random
|
||||
|
||||
# Custom libs
|
||||
|
||||
from utils import mention, from_now, str_datetime, message_link
|
||||
|
||||
|
||||
class History:
|
||||
def __init__(self):
|
||||
self.messages = []
|
||||
|
||||
def to_string(self, *, type: str) -> List[str]:
|
||||
if len(self.messages) == 0:
|
||||
return ["There was no messages matching your filters"]
|
||||
message = None
|
||||
intro = None
|
||||
if type == "first":
|
||||
self.messages.sort(key=lambda m: m.created_at)
|
||||
message = self.messages[0]
|
||||
intro = f"First message out of {len(self.messages):,}"
|
||||
elif type == "last":
|
||||
self.messages.sort(key=lambda m: m.created_at, reverse=True)
|
||||
message = self.messages[0]
|
||||
intro = f"Last message out of {len(self.messages):,}"
|
||||
elif type == "random":
|
||||
message = random.choice(self.messages)
|
||||
intro = f"Random message out of {len(self.messages):,}"
|
||||
|
||||
text = ["> " + line for line in message.content.splitlines()]
|
||||
if message.attachment:
|
||||
text += ["> <image>" if message.image else "> <attachment>"]
|
||||
|
||||
return [
|
||||
intro,
|
||||
f"{str_datetime(message.created_at)} ({from_now(message.created_at)}) {mention(message.author)} said:",
|
||||
*text,
|
||||
f"<{message_link(message)}>",
|
||||
]
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Union, Tuple
|
||||
from typing import Union, Tuple, Any
|
||||
import discord
|
||||
|
||||
from . import MessageLog
|
||||
@@ -9,7 +9,8 @@ FORMAT = 3
|
||||
|
||||
|
||||
class ChannelLogs:
|
||||
def __init__(self, channel: Union[discord.TextChannel, dict]):
|
||||
def __init__(self, channel: Union[discord.TextChannel, dict], guild: Any):
|
||||
self.guild = guild
|
||||
if isinstance(channel, discord.TextChannel):
|
||||
self.id = channel.id
|
||||
self.name = channel.name
|
||||
@@ -27,7 +28,7 @@ class ChannelLogs:
|
||||
if channel["last_message_id"] is not None
|
||||
else None
|
||||
)
|
||||
self.messages = [MessageLog(message) for message in channel["messages"]]
|
||||
self.messages = [MessageLog(message, self) for message in channel["messages"]]
|
||||
|
||||
def is_format(self):
|
||||
return self.format == FORMAT
|
||||
@@ -44,7 +45,7 @@ class ChannelLogs:
|
||||
oldest_first=True,
|
||||
):
|
||||
self.last_message_id = message.id
|
||||
m = MessageLog(message)
|
||||
m = MessageLog(message, self)
|
||||
await m.load(message)
|
||||
self.messages.insert(0, m)
|
||||
yield len(self.messages), False
|
||||
@@ -64,7 +65,7 @@ class ChannelLogs:
|
||||
):
|
||||
done += 1
|
||||
last_message_id = message.id
|
||||
m = MessageLog(message)
|
||||
m = MessageLog(message, self)
|
||||
await m.load(message)
|
||||
self.messages += [m]
|
||||
yield len(self.messages), False
|
||||
|
||||
@@ -49,6 +49,7 @@ class Worker:
|
||||
|
||||
class GuildLogs:
|
||||
def __init__(self, guild: discord.Guild):
|
||||
self.id = guild.id
|
||||
self.guild = guild
|
||||
self.log_file = os.path.join(LOG_DIR, f"{guild.id}.logz")
|
||||
self.channels = {}
|
||||
@@ -104,7 +105,9 @@ class GuildLogs:
|
||||
return CANCELLED, 0
|
||||
await code_message(progress, "Reading saved history (4/4)...")
|
||||
t0 = datetime.now()
|
||||
self.channels = {int(id): ChannelLogs(channels[id]) for id in channels}
|
||||
self.channels = {
|
||||
int(id): ChannelLogs(channels[id], self) for id in channels
|
||||
}
|
||||
# remove invalid format
|
||||
self.channels = {
|
||||
id: self.channels[id]
|
||||
@@ -154,7 +157,7 @@ class GuildLogs:
|
||||
for channel in target_channels:
|
||||
if channel.id not in self.channels or fresh:
|
||||
loading_new += 1
|
||||
self.channels[channel.id] = ChannelLogs(channel)
|
||||
self.channels[channel.id] = ChannelLogs(channel, self)
|
||||
workers += [Worker(self.channels[channel.id], channel)]
|
||||
warning_msg = "(this might take a while)"
|
||||
if len(target_channels) > 5 and loading_new > 5:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Union
|
||||
from typing import Union, Any
|
||||
import discord
|
||||
from datetime import datetime
|
||||
|
||||
@@ -9,7 +9,8 @@ EMBED_IMAGES = ["image", "gifv"]
|
||||
|
||||
|
||||
class MessageLog:
|
||||
def __init__(self, message: Union[discord.Message, dict]):
|
||||
def __init__(self, message: Union[discord.Message, dict], channel: Any):
|
||||
self.channel = channel
|
||||
if isinstance(message, discord.Message):
|
||||
self.id = message.id
|
||||
self.created_at = message.created_at
|
||||
|
||||
+21
@@ -18,6 +18,9 @@ from scanners import (
|
||||
MessagesScanner,
|
||||
ChannelsScanner,
|
||||
ReactionsScanner,
|
||||
FirstScanner,
|
||||
RandomScanner,
|
||||
LastScanner,
|
||||
)
|
||||
from logs import GuildLogs
|
||||
|
||||
@@ -41,6 +44,24 @@ bot.register_command(
|
||||
"cancel: stop current analysis",
|
||||
"```\n" + "%cancel: Stop current analysis\n" + "```",
|
||||
)
|
||||
bot.register_command(
|
||||
"last",
|
||||
lambda *args: LastScanner().compute(*args),
|
||||
"last: read last message",
|
||||
LastScanner.help(),
|
||||
)
|
||||
bot.register_command(
|
||||
"rand(om)?",
|
||||
lambda *args: RandomScanner().compute(*args),
|
||||
"rand: read a random message",
|
||||
RandomScanner.help(),
|
||||
)
|
||||
bot.register_command(
|
||||
"first",
|
||||
lambda *args: FirstScanner().compute(*args),
|
||||
"first: read first message",
|
||||
FirstScanner.help(),
|
||||
)
|
||||
bot.register_command(
|
||||
"mentioned",
|
||||
lambda *args: MentionedScanner().compute(*args),
|
||||
|
||||
@@ -8,3 +8,6 @@ from .mentioned_scanner import MentionedScanner
|
||||
from .messages_scanner import MessagesScanner
|
||||
from .channels_scanner import ChannelsScanner
|
||||
from .reactions_scanner import ReactionsScanner
|
||||
from .first_scanner import FirstScanner
|
||||
from .last_scanner import LastScanner
|
||||
from .random_scanner import RandomScanner
|
||||
@@ -0,0 +1,19 @@
|
||||
from typing import List
|
||||
|
||||
# Custom libs
|
||||
|
||||
from .history_scanner import HistoryScanner
|
||||
|
||||
|
||||
class FirstScanner(HistoryScanner):
|
||||
@staticmethod
|
||||
def help() -> str:
|
||||
return super(FirstScanner, FirstScanner).help(
|
||||
cmd="first", text="Read first message"
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(help=FirstScanner.help())
|
||||
|
||||
def get_results(self, intro: str) -> List[str]:
|
||||
return self.history.to_string(type="first")
|
||||
@@ -0,0 +1,70 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List
|
||||
import discord
|
||||
|
||||
# Custom libs
|
||||
|
||||
from .scanner import Scanner
|
||||
from data_types import History
|
||||
from logs import ChannelLogs, MessageLog
|
||||
from utils import COMMON_HELP_ARGS
|
||||
|
||||
|
||||
class HistoryScanner(Scanner, ABC):
|
||||
@staticmethod
|
||||
def help(*, cmd: str, text: str) -> str:
|
||||
return (
|
||||
"```\n"
|
||||
+ f"%{cmd}: {text}\n"
|
||||
+ "arguments:\n"
|
||||
+ COMMON_HELP_ARGS
|
||||
+ "* all/everyone - include bots\n"
|
||||
+ "Example: %{cmd} #mychannel1 @user\n"
|
||||
+ "```"
|
||||
)
|
||||
|
||||
def __init__(self, *, help: str):
|
||||
super().__init__(
|
||||
has_digit_args=True,
|
||||
valid_args=["all", "everyone"],
|
||||
help=help,
|
||||
intro_context="",
|
||||
)
|
||||
|
||||
async def init(self, message: discord.Message, *args: str) -> bool:
|
||||
self.history = History()
|
||||
self.all_messages = "all" in args or "everyone" in args
|
||||
return True
|
||||
|
||||
def compute_message(self, channel: ChannelLogs, message: MessageLog):
|
||||
return HistoryScanner.analyse_message(
|
||||
channel,
|
||||
message,
|
||||
self.history,
|
||||
self.raw_members,
|
||||
all_messages=self.all_messages,
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def get_results(self, intro: str):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def analyse_message(
|
||||
channel: ChannelLogs,
|
||||
message: MessageLog,
|
||||
history: History,
|
||||
raw_members: List[int],
|
||||
*,
|
||||
all_messages: bool,
|
||||
) -> bool:
|
||||
impacted = False
|
||||
# If author is included in the selection (empty list is all)
|
||||
if (
|
||||
(not message.bot or all_messages)
|
||||
and len(raw_members) == 0
|
||||
or message.author in raw_members
|
||||
) and (message.content or message.attachment):
|
||||
impacted = True
|
||||
history.messages += [message]
|
||||
return impacted
|
||||
@@ -0,0 +1,19 @@
|
||||
from typing import List
|
||||
|
||||
# Custom libs
|
||||
|
||||
from .history_scanner import HistoryScanner
|
||||
|
||||
|
||||
class LastScanner(HistoryScanner):
|
||||
@staticmethod
|
||||
def help() -> str:
|
||||
return super(LastScanner, LastScanner).help(
|
||||
cmd="last", text="Read last message"
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(help=LastScanner.help())
|
||||
|
||||
def get_results(self, intro: str) -> List[str]:
|
||||
return self.history.to_string(type="last")
|
||||
@@ -0,0 +1,19 @@
|
||||
from typing import List
|
||||
|
||||
# Custom libs
|
||||
|
||||
from .history_scanner import HistoryScanner
|
||||
|
||||
|
||||
class RandomScanner(HistoryScanner):
|
||||
@staticmethod
|
||||
def help() -> str:
|
||||
return super(RandomScanner, RandomScanner).help(
|
||||
cmd="rand", text="Read a random message"
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(help=RandomScanner.help())
|
||||
|
||||
def get_results(self, intro: str) -> List[str]:
|
||||
return self.history.to_string(type="random")
|
||||
@@ -5,7 +5,7 @@ import logging
|
||||
import re
|
||||
import discord
|
||||
|
||||
from utils import no_duplicate, get_intro, delta, deltas, mention, channel_mention
|
||||
from utils import no_duplicate, get_intro, delta
|
||||
from logs import GuildLogs, ChannelLogs, MessageLog, ALREADY_RUNNING, CANCELLED
|
||||
|
||||
|
||||
@@ -173,5 +173,5 @@ class Scanner(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_results(self, intro: str):
|
||||
def get_results(self, intro: str) -> List[str]:
|
||||
pass
|
||||
|
||||
@@ -51,6 +51,10 @@ def channel_mention(channel_id: int) -> str:
|
||||
return f"<#{channel_id}>"
|
||||
|
||||
|
||||
def message_link(message: discord.Message) -> str:
|
||||
return f"https://discord.com/channels/{message.channel.guild.id}/{message.channel.id}/{message.id}"
|
||||
|
||||
|
||||
class FakeMessage:
|
||||
def __init__(self, id: int):
|
||||
self.id = id
|
||||
|
||||
Reference in New Issue
Block a user