improve(commands|Logs>socketstats): format output into an embed
This commit is contained in:
parent
74307c755c
commit
d3ab384de0
3 changed files with 77 additions and 30 deletions
26
tuxbot/cogs/Logs/functions/utils.py
Normal file
26
tuxbot/cogs/Logs/functions/utils.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
from collections import Counter
|
||||
|
||||
|
||||
def sort_by(_events: Counter) -> dict[str, dict]:
|
||||
majors = [
|
||||
"guild",
|
||||
"channel",
|
||||
"message",
|
||||
"invite",
|
||||
"integration",
|
||||
"presence",
|
||||
"voice",
|
||||
"other",
|
||||
]
|
||||
sorted_events = {m: {} for m in majors}
|
||||
|
||||
for event, count in _events:
|
||||
done = False
|
||||
for m in majors:
|
||||
if event.lower().startswith(m):
|
||||
sorted_events[m][event] = count
|
||||
done = True
|
||||
if not done:
|
||||
sorted_events["other"][event] = count
|
||||
|
||||
return sorted_events
|
|
@ -23,6 +23,7 @@ from tuxbot.core.utils.functions.extra import (
|
|||
)
|
||||
from tuxbot.core.utils.data_manager import cogs_data_path
|
||||
from .config import LogsConfig
|
||||
from .functions.utils import sort_by
|
||||
|
||||
log = logging.getLogger("tuxbot.cogs.Logs")
|
||||
_ = Translator("Logs", __file__)
|
||||
|
@ -279,7 +280,6 @@ class Logs(commands.Cog, name="Logs"):
|
|||
await ctx.send(f"```\n{output}\n```")
|
||||
|
||||
@command_extra(name="socketstats", hidden=True, deletable=True)
|
||||
@commands.is_owner()
|
||||
async def _socketstats(self, ctx: ContextPlus):
|
||||
delta = datetime.datetime.now() - self.bot.uptime
|
||||
minutes = delta.total_seconds() / 60
|
||||
|
@ -287,19 +287,31 @@ class Logs(commands.Cog, name="Logs"):
|
|||
counter = self.bot.stats["socket"]
|
||||
if None in counter:
|
||||
counter.pop(None)
|
||||
width = len(max(counter, key=len)) + 1
|
||||
common = counter.most_common()
|
||||
|
||||
total = sum(self.bot.stats["socket"].values())
|
||||
cpm = total / minutes
|
||||
|
||||
output = "\n".join(f"{k:<{width}}: {c}" for k, c in common)
|
||||
|
||||
await ctx.send(
|
||||
f"{total} socket events observed ({cpm:.2f}/minute):"
|
||||
f"```\n{output}\n```"
|
||||
e = discord.Embed(
|
||||
title=_("Sockets stats", ctx, self.bot.config),
|
||||
description=_(
|
||||
"{} socket events observed ({:.2f}/minute):",
|
||||
ctx,
|
||||
self.bot.config,
|
||||
).format(total, cpm),
|
||||
color=discord.colour.Color.green(),
|
||||
)
|
||||
|
||||
for major, events in sort_by(counter.most_common()).items():
|
||||
if events:
|
||||
output = "\n".join(f"{k}: {v}" for k, v in events.items())
|
||||
e.add_field(
|
||||
name=major.capitalize(),
|
||||
value=f"```\n{output}\n```",
|
||||
inline=False,
|
||||
)
|
||||
|
||||
await ctx.send(embed=e)
|
||||
|
||||
@command_extra(name="uptime")
|
||||
async def _uptime(self, ctx: ContextPlus):
|
||||
uptime = humanize.naturaltime(
|
||||
|
|
|
@ -100,12 +100,14 @@ class Tux(commands.AutoShardedBot):
|
|||
self.uptime = None
|
||||
self._app_owners_fetched = False # to prevent abusive API calls
|
||||
|
||||
self.before_invoke(self._typing)
|
||||
|
||||
super().__init__(
|
||||
*args, help_command=None, intents=discord.Intents.all(), **kwargs
|
||||
)
|
||||
self.session = aiohttp.ClientSession(loop=self.loop)
|
||||
|
||||
async def _is_blacklister(self, message: discord.Message) -> bool:
|
||||
async def _is_blacklisted(self, message: discord.Message) -> bool:
|
||||
"""Check for blacklists."""
|
||||
if message.author.bot:
|
||||
return True
|
||||
|
@ -121,6 +123,10 @@ class Tux(commands.AutoShardedBot):
|
|||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
async def _typing(ctx: ContextPlus) -> None:
|
||||
await ctx.trigger_typing()
|
||||
|
||||
async def load_packages(self):
|
||||
if packages:
|
||||
with Progress() as progress:
|
||||
|
@ -230,30 +236,30 @@ class Tux(commands.AutoShardedBot):
|
|||
|
||||
# pylint: disable=unused-argument
|
||||
async def get_context(self, message: discord.Message, *, cls=None):
|
||||
return await super().get_context(message, cls=ContextPlus)
|
||||
ctx: ContextPlus = await super().get_context(message, cls=ContextPlus)
|
||||
|
||||
if (ctx is None or not ctx.valid) and (
|
||||
user_aliases := search_for(
|
||||
self.config.Users, message.author.id, "aliases"
|
||||
)
|
||||
):
|
||||
# noinspection PyUnboundLocalVariable
|
||||
for alias, command in user_aliases.items():
|
||||
back_content = message.content
|
||||
message.content = message.content.replace(alias, command, 1)
|
||||
|
||||
if (
|
||||
ctx := await super().get_context(message, cls=ContextPlus)
|
||||
) is None or not ctx.valid:
|
||||
message.content = back_content
|
||||
else:
|
||||
break
|
||||
|
||||
return ctx
|
||||
|
||||
async def process_commands(self, message: discord.Message):
|
||||
ctx: ContextPlus = await self.get_context(message)
|
||||
|
||||
if ctx is None or not ctx.valid:
|
||||
if user_aliases := search_for(
|
||||
self.config.Users, message.author.id, "aliases"
|
||||
):
|
||||
for alias, command in user_aliases.items():
|
||||
back_content = message.content
|
||||
message.content = message.content.replace(
|
||||
alias, command, 1
|
||||
)
|
||||
|
||||
if (
|
||||
ctx := await self.get_context(message)
|
||||
) is None or not ctx.valid:
|
||||
message.content = back_content
|
||||
else:
|
||||
break
|
||||
|
||||
self.dispatch("message_without_command", message)
|
||||
|
||||
if ctx is not None and ctx.valid:
|
||||
if ctx.command in search_for(
|
||||
self.config.Servers, message.guild.id, "disabled_command", []
|
||||
|
@ -264,9 +270,12 @@ class Tux(commands.AutoShardedBot):
|
|||
raise exceptions.DisabledCommandByBotOwner
|
||||
|
||||
await self.invoke(ctx)
|
||||
else:
|
||||
self.dispatch("message_without_command", message)
|
||||
|
||||
async def on_message(self, message: discord.Message):
|
||||
await self.process_commands(message)
|
||||
if not await self._is_blacklisted(message):
|
||||
await self.process_commands(message)
|
||||
|
||||
async def start(self, token, bot): # pylint: disable=arguments-differ
|
||||
"""Connect to Discord and start all connections.
|
||||
|
|
Loading…
Reference in a new issue