fix(commands:ralgo|Crypto): set ralgo as async
This commit is contained in:
parent
edfeadb872
commit
34e32fdf68
9 changed files with 73 additions and 42 deletions
|
@ -1,12 +1,10 @@
|
|||
import functools
|
||||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from ralgo.ralgo import Ralgo
|
||||
|
||||
from tuxbot.cogs.Crypto.functions.extractor import extract
|
||||
from tuxbot.cogs.Crypto.functions.file import find_ext
|
||||
from tuxbot.cogs.Crypto.functions.sync import encode
|
||||
from tuxbot.core.bot import Tux
|
||||
from tuxbot.core.i18n import (
|
||||
Translator,
|
||||
|
@ -30,50 +28,33 @@ class Crypto(commands.Cog, name="Crypto"):
|
|||
@_ralgo.command(name="encode")
|
||||
async def _ralgo_encode(self, ctx: ContextPlus, *, data: str = None):
|
||||
try:
|
||||
params = await extract(ctx.message.attachments, data, 100000)
|
||||
params = await extract(ctx.message.attachments, data, 10000)
|
||||
except ValueError:
|
||||
return await ctx.send("Invalid data provided")
|
||||
|
||||
statement = Ralgo(params["message"])
|
||||
params = params["params"]
|
||||
encoded = statement.encode(chars=params["chars"])
|
||||
async with ctx.typing():
|
||||
output = await self.bot.loop.run_in_executor(
|
||||
None, functools.partial(encode, params)
|
||||
)
|
||||
|
||||
if params["compressed"]:
|
||||
return await ctx.send(str(encoded.compress()))
|
||||
if params["graphical"]:
|
||||
output = encoded.graphical().encode()
|
||||
file = discord.File(BytesIO(output.to_bytes()), "output.png")
|
||||
return await ctx.send(file=output)
|
||||
|
||||
return await ctx.send(file=file)
|
||||
|
||||
await ctx.send(str(encoded))
|
||||
await ctx.send(output)
|
||||
|
||||
@_ralgo.command(name="decode")
|
||||
async def _ralgo_decode(self, ctx: ContextPlus, *, data: str = None):
|
||||
try:
|
||||
params = await extract(ctx.message.attachments, data, 5000000)
|
||||
params = await extract(ctx.message.attachments, data, 100000)
|
||||
except ValueError:
|
||||
return await ctx.send("Invalid data provided")
|
||||
|
||||
statement = Ralgo(params["message"])
|
||||
params = params["params"]
|
||||
async with ctx.typing():
|
||||
output = await self.bot.loop.run_in_executor(
|
||||
None, functools.partial(encode, params)
|
||||
)
|
||||
|
||||
if params["graphical"]:
|
||||
output = Ralgo(statement.graphical().decode()).decode()
|
||||
if isinstance(output, discord.File):
|
||||
return await ctx.send(file=output)
|
||||
|
||||
output = discord.utils.escape_markdown(str(output))
|
||||
output = discord.utils.escape_mentions(output)
|
||||
elif params["compressed"]:
|
||||
output = Ralgo(statement.decompress()).decode()
|
||||
else:
|
||||
output = statement.decode(chars=params["chars"])
|
||||
|
||||
if isinstance(output, bytes):
|
||||
file = discord.File(BytesIO(output), f"output.{find_ext(output)}")
|
||||
|
||||
return await ctx.send(file=file)
|
||||
|
||||
output = discord.utils.escape_markdown(str(output))
|
||||
output = discord.utils.escape_mentions(output)
|
||||
|
||||
await ctx.send(output if len(output) > 0 else "no content...")
|
||||
await ctx.send(output)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# https://python-forum.io/Thread-read-a-binary-file-to-find-its-type
|
||||
def find_ext(content: bytes) -> str:
|
||||
magic_numbers = {
|
||||
'png': bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A])
|
||||
"png": bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A])
|
||||
}
|
||||
|
||||
if content.startswith(magic_numbers["png"]):
|
||||
|
|
40
tuxbot/cogs/Crypto/functions/sync.py
Normal file
40
tuxbot/cogs/Crypto/functions/sync.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
from io import BytesIO
|
||||
from typing import Union
|
||||
|
||||
import discord
|
||||
from ralgo.ralgo import Ralgo
|
||||
from tuxbot.cogs.Crypto.functions.file import find_ext
|
||||
|
||||
|
||||
def encode(params: dict) -> Union[str, discord.File]:
|
||||
statement = Ralgo(params["message"])
|
||||
params = params["params"]
|
||||
encoded = statement.encode(chars=params["chars"])
|
||||
|
||||
if params["compressed"]:
|
||||
return str(encoded.compress())
|
||||
if params["graphical"]:
|
||||
output = encoded.graphical().encode()
|
||||
return discord.File(BytesIO(output.to_bytes()), "output.png")
|
||||
|
||||
return str(encoded)
|
||||
|
||||
|
||||
def decode(params: dict) -> Union[str, discord.File]:
|
||||
statement = Ralgo(params["message"])
|
||||
params = params["params"]
|
||||
|
||||
if params["graphical"]:
|
||||
output = Ralgo(statement.graphical().decode()).decode()
|
||||
elif params["compressed"]:
|
||||
output = Ralgo(statement.decompress()).decode()
|
||||
else:
|
||||
output = statement.decode(chars=params["chars"])
|
||||
|
||||
if isinstance(output, bytes):
|
||||
return discord.File(BytesIO(output), f"output.{find_ext(output)}")
|
||||
|
||||
output = discord.utils.escape_markdown(str(output))
|
||||
output = discord.utils.escape_mentions(output)
|
||||
|
||||
return output if len(output) > 0 else "no content..."
|
|
@ -25,6 +25,7 @@ from tuxbot.core.utils.functions.extra import (
|
|||
from tuxbot.core.utils.data_manager import cogs_data_path
|
||||
from .config import LogsConfig
|
||||
from .functions.utils import sort_by
|
||||
from ...core.utils.functions.utils import shorten
|
||||
|
||||
log = logging.getLogger("tuxbot.cogs.Logs")
|
||||
_ = Translator("Logs", __file__)
|
||||
|
@ -125,7 +126,7 @@ class Logs(commands.Cog, name="Logs"):
|
|||
|
||||
emoji = types.get(record.levelname, ":heavy_multiplication_x:")
|
||||
dt = datetime.datetime.utcfromtimestamp(record.created)
|
||||
msg = f"{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {record.message}`"
|
||||
msg = f"{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {await shorten(self.bot.session, record.msg, 1500)}`"
|
||||
await self.webhook("gateway").send(msg)
|
||||
|
||||
def clear_gateway_data(self):
|
||||
|
|
|
@ -168,7 +168,7 @@ class Network(commands.Cog, name="Network"):
|
|||
headers.pop("Set-Cookie", headers)
|
||||
|
||||
for key, value in headers.items():
|
||||
output = await shorten(ctx, value, 50)
|
||||
output = await shorten(ctx.session, value, 50)
|
||||
|
||||
if output["link"] is not None:
|
||||
value = _(
|
||||
|
|
|
@ -42,7 +42,7 @@ packages: List[str] = [
|
|||
"tuxbot.cogs.Polls",
|
||||
"tuxbot.cogs.Custom",
|
||||
"tuxbot.cogs.Network",
|
||||
"tuxbot.cogs.Crypto",
|
||||
# "tuxbot.cogs.Crypto",
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ class Config(Structure):
|
|||
prefixes: List[str] = []
|
||||
token: str = StrField("")
|
||||
ip: str = StrField("")
|
||||
ip6: str = StrField("")
|
||||
mentionable: bool = BoolField("")
|
||||
locale: str = StrField("")
|
||||
disabled_command: List[str] = []
|
||||
|
|
|
@ -39,6 +39,7 @@ class ContextPlus(commands.Context):
|
|||
PASSWORD_REPLACEMENT,
|
||||
)
|
||||
.replace(self.bot.config.Core.ip, IP_REPLACEMENT)
|
||||
.replace(self.bot.config.Core.ip6, IP_REPLACEMENT)
|
||||
)
|
||||
|
||||
if len(content) > 1800:
|
||||
|
@ -57,6 +58,7 @@ class ContextPlus(commands.Context):
|
|||
PASSWORD_REPLACEMENT,
|
||||
)
|
||||
.replace(self.bot.config.Core.ip, IP_REPLACEMENT)
|
||||
.replace(self.bot.config.Core.ip6, IP_REPLACEMENT)
|
||||
)
|
||||
elif isinstance(value, list):
|
||||
e[key] = replace_in_list(
|
||||
|
@ -70,6 +72,9 @@ class ContextPlus(commands.Context):
|
|||
e[key] = replace_in_list(
|
||||
e[key], self.bot.config.Core.ip, IP_REPLACEMENT
|
||||
)
|
||||
e[key] = replace_in_list(
|
||||
e[key], self.bot.config.Core.ip6, IP_REPLACEMENT
|
||||
)
|
||||
elif isinstance(value, dict):
|
||||
e[key] = replace_in_dict(
|
||||
value, self.bot.config.Core.token, TOKEN_REPLACEMENT
|
||||
|
@ -82,6 +87,9 @@ class ContextPlus(commands.Context):
|
|||
e[key] = replace_in_dict(
|
||||
e[key], self.bot.config.Core.ip, IP_REPLACEMENT
|
||||
)
|
||||
e[key] = replace_in_dict(
|
||||
e[key], self.bot.config.Core.ip6, IP_REPLACEMENT
|
||||
)
|
||||
embed = Embed.from_dict(e)
|
||||
|
||||
if (
|
||||
|
|
|
@ -26,13 +26,13 @@ def typing(func):
|
|||
return wrapped
|
||||
|
||||
|
||||
async def shorten(ctx: ContextPlus, text: str, length: int) -> dict:
|
||||
async def shorten(session, text: str, length: int) -> dict:
|
||||
output = {"text": text[:length], "link": None}
|
||||
|
||||
if len(text) > length:
|
||||
output["text"] += "[...]"
|
||||
try:
|
||||
async with ctx.session.post(
|
||||
async with session.post(
|
||||
"https://paste.ramle.be/documents",
|
||||
data=text.encode(),
|
||||
timeout=aiohttp.ClientTimeout(total=2),
|
||||
|
|
Loading…
Reference in a new issue