refactor(all): start from new
feat(doc): add readme file
This commit is contained in:
parent
28d1d71c5a
commit
078dc075f2
22 changed files with 120 additions and 311 deletions
tuxbot
14
tuxbot/__init__.py
Normal file
14
tuxbot/__init__.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
import subprocess
|
||||
from collections import namedtuple
|
||||
|
||||
build = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']) \
|
||||
.decode()
|
||||
|
||||
VersionInfo = namedtuple('VersionInfo', 'major minor micro releaselevel build')
|
||||
version_info = VersionInfo(
|
||||
major=3, minor=0, micro=0,
|
||||
releaselevel='alpha', build=build
|
||||
)
|
||||
|
||||
__version__ = "v{}.{}.{}" \
|
||||
.format(version_info.major, version_info.minor, version_info.micro)
|
2
tuxbot/__main__.py
Normal file
2
tuxbot/__main__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
def main():
|
||||
...
|
5
tuxbot/cogs/images/__init__.py
Normal file
5
tuxbot/cogs/images/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
from .images import Images
|
||||
|
||||
|
||||
def setup(bot):
|
||||
bot.add_cog(Images(bot))
|
175
tuxbot/cogs/images/images.py
Normal file
175
tuxbot/cogs/images/images.py
Normal file
|
@ -0,0 +1,175 @@
|
|||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
import discord
|
||||
from discord.ext import commands, flags
|
||||
|
||||
from app import TuxBot
|
||||
from utils.functions.extra import ContextPlus, command_extra
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Images(commands.Cog, name="Images"):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.image_api = "http://0.0.0.0:8080"
|
||||
|
||||
async def _send_meme(self, ctx: ContextPlus, endpoint: str, **passed_flags):
|
||||
async with ctx.typing():
|
||||
url = f"{self.image_api}/{endpoint}?"
|
||||
for key, val in passed_flags.items():
|
||||
if val:
|
||||
url += f"{key}={val}&"
|
||||
|
||||
async with self.bot.session.get(url) as r:
|
||||
if r.status != 200:
|
||||
return await ctx.send("Failed...")
|
||||
|
||||
data = BytesIO(await r.read())
|
||||
|
||||
await ctx.send(
|
||||
file=discord.File(data, "output.png")
|
||||
)
|
||||
|
||||
@command_extra(name="phcomment")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _phcomment(self, ctx: ContextPlus, user: discord.User = None, *, message: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
|
||||
async with ctx.typing():
|
||||
message = message.replace("&", "%26")
|
||||
if user is None:
|
||||
avatar = ctx.author.avatar_url_as(format='png')
|
||||
username = ctx.author.name
|
||||
else:
|
||||
avatar = user.avatar_url_as(format='png')
|
||||
username = user.name
|
||||
|
||||
url = f"{self.image_api}/ph/comment" \
|
||||
f"?image={avatar}" \
|
||||
f"&username={username}" \
|
||||
f"&message={message}"
|
||||
|
||||
async with self.bot.session.get(url) as r:
|
||||
if r.status != 200:
|
||||
return await ctx.send("Failed...")
|
||||
|
||||
data = BytesIO(await r.read())
|
||||
|
||||
await ctx.send(
|
||||
file=discord.File(data, "output.png")
|
||||
)
|
||||
|
||||
@command_extra(name="phvideo")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _phvideo(self, ctx: ContextPlus, image: str, author: discord.User, *, title: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
|
||||
async with ctx.typing():
|
||||
url = f"{self.image_api}/ph/video" \
|
||||
f"?image={image}" \
|
||||
f"&username={author.name}" \
|
||||
f"&title={title}"
|
||||
|
||||
async with self.bot.session.get(url) as r:
|
||||
if r.status != 200:
|
||||
return await ctx.send("Failed...")
|
||||
|
||||
data = BytesIO(await r.read())
|
||||
|
||||
await ctx.send(
|
||||
file=discord.File(data, "output.png")
|
||||
)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@command_extra(name="balloon")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _balloon(self, ctx: ContextPlus, **passed_flags):
|
||||
passed_flags["text3"] = passed_flags.get("text3")
|
||||
passed_flags["text4"] = passed_flags.get("text1")
|
||||
passed_flags["text5"] = passed_flags.get("text2")
|
||||
|
||||
await self._send_meme(ctx, 'balloon', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@command_extra(name="butterfly")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _butterfly(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'butterfly', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@command_extra(name="buttons")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _buttons(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'buttons', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@command_extra(name="cmm")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _cmm(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'change_my_mind', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@command_extra(name="drake")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _drake(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'drake', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str, default=False)
|
||||
@command_extra(name="fry")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _fry(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'fry', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str, default=False)
|
||||
@command_extra(name="imagination")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _imagination(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'imagination', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str, default=False)
|
||||
@command_extra(name="everywhere")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _everywhere(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'everywhere', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@command_extra(name="choice")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _choice(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'choice', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@command_extra(name="pika")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _pika(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'pika', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@command_extra(name="pkp")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _pkp(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'pkp', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@command_extra(name="puppet")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _puppet(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'puppet', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@command_extra(name="scroll_of_truth", alias=['sot'])
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _sot(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'scroll_of_truth', **passed_flags)
|
14
tuxbot/cogs/logs/__init__.py
Normal file
14
tuxbot/cogs/logs/__init__.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
import logging
|
||||
|
||||
from discord.ext import commands
|
||||
|
||||
from .logs import Logs, GatewayHandler, on_error
|
||||
|
||||
|
||||
def setup(bot):
|
||||
cog = Logs(bot)
|
||||
bot.add_cog(cog)
|
||||
|
||||
handler = GatewayHandler(cog)
|
||||
logging.getLogger().addHandler(handler)
|
||||
commands.AutoShardedBot.on_error = on_error
|
323
tuxbot/cogs/logs/logs.py
Normal file
323
tuxbot/cogs/logs/logs.py
Normal file
|
@ -0,0 +1,323 @@
|
|||
"""
|
||||
|
||||
Based on https://github.com/Rapptz/RoboDanny/blob/3d94e89ef27f702a5f57f432a9131bdfb60bb3ec/cogs/stats.py
|
||||
Adapted by Romain J.
|
||||
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import textwrap
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
|
||||
import discord
|
||||
import humanize
|
||||
import psutil
|
||||
from discord.ext import commands, tasks
|
||||
|
||||
from app import TuxBot
|
||||
from utils.functions.extra import command_extra
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GatewayHandler(logging.Handler):
|
||||
def __init__(self, cog):
|
||||
self.cog = cog
|
||||
super().__init__(logging.INFO)
|
||||
|
||||
def filter(self, record):
|
||||
return record.name == 'discord.gateway' \
|
||||
or 'Shard ID' in record.msg \
|
||||
or 'Websocket closed ' in record.msg
|
||||
|
||||
def emit(self, record):
|
||||
self.cog.add_record(record)
|
||||
|
||||
|
||||
class Logs(commands.Cog):
|
||||
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
self.process = psutil.Process()
|
||||
self._batch_lock = asyncio.Lock(loop=bot.loop)
|
||||
self._data_batch = []
|
||||
self._gateway_queue = asyncio.Queue(loop=bot.loop)
|
||||
self.gateway_worker.start()
|
||||
|
||||
self._resumes = []
|
||||
self._identifies = defaultdict(list)
|
||||
|
||||
def _clear_gateway_data(self):
|
||||
one_week_ago = datetime.datetime.utcnow() - datetime.timedelta(days=7)
|
||||
to_remove = [
|
||||
index for index, dt in enumerate(self._resumes)
|
||||
if dt < one_week_ago
|
||||
]
|
||||
for index in reversed(to_remove):
|
||||
del self._resumes[index]
|
||||
|
||||
for shard_id, dates in self._identifies.items():
|
||||
to_remove = [index for index, dt in enumerate(dates) if
|
||||
dt < one_week_ago]
|
||||
for index in reversed(to_remove):
|
||||
del dates[index]
|
||||
|
||||
@tasks.loop(seconds=0.0)
|
||||
async def gateway_worker(self):
|
||||
record = await self._gateway_queue.get()
|
||||
await self.notify_gateway_status(record)
|
||||
|
||||
async def register_command(self, ctx):
|
||||
if ctx.command is None:
|
||||
return
|
||||
|
||||
command = ctx.command.qualified_name
|
||||
self.bot.command_stats[command] += 1
|
||||
message = ctx.message
|
||||
if ctx.guild is None:
|
||||
destination = 'Private Message'
|
||||
guild_id = None
|
||||
else:
|
||||
destination = f'#{message.channel} ({message.guild})'
|
||||
guild_id = ctx.guild.id
|
||||
|
||||
log.info(
|
||||
f'{message.created_at}: {message.author} '
|
||||
f'in {destination}: {message.content}')
|
||||
async with self._batch_lock:
|
||||
self._data_batch.append({
|
||||
'guild': guild_id,
|
||||
'channel': ctx.channel.id,
|
||||
'author': ctx.author.id,
|
||||
'used': message.created_at.isoformat(),
|
||||
'prefix': ctx.prefix,
|
||||
'command': command,
|
||||
'failed': ctx.command_failed,
|
||||
})
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_command_completion(self, ctx):
|
||||
await self.register_command(ctx)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_socket_response(self, msg):
|
||||
self.bot.socket_stats[msg.get('t')] += 1
|
||||
|
||||
@property
|
||||
def logs(self):
|
||||
webhooks = {}
|
||||
|
||||
for key, value in self.bot.logs_channels.items():
|
||||
webhooks[key] = discord.Webhook.partial(
|
||||
id=value.get('webhook')['id'],
|
||||
token=value.get('webhook')['token'],
|
||||
adapter=discord.AsyncWebhookAdapter(
|
||||
self.bot.session
|
||||
)
|
||||
)
|
||||
|
||||
return webhooks
|
||||
|
||||
async def log_error(self, *, ctx=None, extra=None):
|
||||
e = discord.Embed(title='Error', colour=0xdd5f53)
|
||||
e.description = f'```py\n{traceback.format_exc()}\n```'
|
||||
e.add_field(name='Extra', value=extra, inline=False)
|
||||
e.timestamp = datetime.datetime.utcnow()
|
||||
|
||||
if ctx is not None:
|
||||
fmt = '{0} (ID: {0.id})'
|
||||
author = fmt.format(ctx.author)
|
||||
channel = fmt.format(ctx.channel)
|
||||
guild = 'None' if ctx.guild is None else fmt.format(ctx.guild)
|
||||
|
||||
e.add_field(name='Author', value=author)
|
||||
e.add_field(name='Channel', value=channel)
|
||||
e.add_field(name='Guild', value=guild)
|
||||
|
||||
await self.logs.get('errors').send(embed=e)
|
||||
|
||||
async def send_guild_stats(self, e, guild):
|
||||
e.add_field(name='Name', value=guild.name)
|
||||
e.add_field(name='ID', value=guild.id)
|
||||
e.add_field(name='Shard ID', value=guild.shard_id or 'N/A')
|
||||
e.add_field(name='Owner',
|
||||
value=f'{guild.owner} (ID: {guild.owner.id})')
|
||||
|
||||
bots = sum(member.bot for member in guild.members)
|
||||
total = guild.member_count
|
||||
online = sum(member.status is discord.Status.online
|
||||
for member in guild.members)
|
||||
|
||||
e.add_field(name='Members', value=str(total))
|
||||
e.add_field(name='Bots', value=f'{bots} ({bots / total:.2%})')
|
||||
e.add_field(name='Online', value=f'{online} ({online / total:.2%})')
|
||||
|
||||
if guild.icon:
|
||||
e.set_thumbnail(url=guild.icon_url)
|
||||
|
||||
if guild.me:
|
||||
e.timestamp = guild.me.joined_at
|
||||
|
||||
await self.logs.get('guilds').send(embed=e)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_guild_join(self, guild: discord.guild):
|
||||
e = discord.Embed(colour=0x53dda4, title='New Guild') # green colour
|
||||
await self.send_guild_stats(e, guild)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_guild_remove(self, guild: discord.guild):
|
||||
e = discord.Embed(colour=0xdd5f53, title='Left Guild') # red colour
|
||||
await self.send_guild_stats(e, guild)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_message(self, message: discord.message):
|
||||
ctx = await self.bot.get_context(message)
|
||||
if ctx.valid:
|
||||
return
|
||||
|
||||
if isinstance(message.channel, discord.DMChannel):
|
||||
if message.author is self.bot.user:
|
||||
e = discord.Embed(
|
||||
title=f"DM to: {message.channel.recipient}",
|
||||
description=message.content,
|
||||
color=0x39e326
|
||||
)
|
||||
else:
|
||||
e = discord.Embed(
|
||||
title="New DM:",
|
||||
description=message.content,
|
||||
color=0x0A97F5
|
||||
)
|
||||
e.set_author(
|
||||
name=message.channel.recipient,
|
||||
icon_url=message.channel.recipient.avatar_url_as(format="png")
|
||||
)
|
||||
|
||||
if message.attachments:
|
||||
attachment_url = message.attachments[0].url
|
||||
e.set_image(url=attachment_url)
|
||||
|
||||
e.set_footer(
|
||||
text=f"User ID: {message.channel.recipient.id}"
|
||||
)
|
||||
|
||||
await self.logs["dm"].send(embed=e)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_command_error(self, ctx, error):
|
||||
await self.register_command(ctx)
|
||||
if not isinstance(error, (
|
||||
commands.CommandInvokeError, commands.ConversionError)):
|
||||
return
|
||||
|
||||
error = error.original
|
||||
if isinstance(error, (discord.Forbidden, discord.NotFound)):
|
||||
return
|
||||
|
||||
e = discord.Embed(title='Command Error', colour=0xcc3366)
|
||||
e.add_field(name='Name', value=ctx.command.qualified_name)
|
||||
e.add_field(name='Author', value=f'{ctx.author} (ID: {ctx.author.id})')
|
||||
|
||||
fmt = f'Channel: {ctx.channel} (ID: {ctx.channel.id})'
|
||||
if ctx.guild:
|
||||
fmt = f'{fmt}\nGuild: {ctx.guild} (ID: {ctx.guild.id})'
|
||||
|
||||
e.add_field(name='Location', value=fmt, inline=False)
|
||||
e.add_field(name='Content', value=textwrap.shorten(
|
||||
ctx.message.content,
|
||||
width=512
|
||||
))
|
||||
|
||||
exc = ''.join(traceback.format_exception(
|
||||
type(error), error, error.__traceback__,
|
||||
chain=False)
|
||||
)
|
||||
e.description = f'```py\n{exc}\n```'
|
||||
e.timestamp = datetime.datetime.utcnow()
|
||||
await self.logs.get('errors').send(embed=e)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_socket_raw_send(self, data):
|
||||
if '"op":2' not in data and '"op":6' not in data:
|
||||
return
|
||||
|
||||
back_to_json = json.loads(data)
|
||||
if back_to_json['op'] == 2:
|
||||
payload = back_to_json['d']
|
||||
inner_shard = payload.get('shard', [0])
|
||||
self._identifies[inner_shard[0]].append(datetime.datetime.utcnow())
|
||||
else:
|
||||
self._resumes.append(datetime.datetime.utcnow())
|
||||
|
||||
self._clear_gateway_data()
|
||||
|
||||
def add_record(self, record):
|
||||
self._gateway_queue.put_nowait(record)
|
||||
|
||||
async def notify_gateway_status(self, record):
|
||||
types = {
|
||||
'INFO': ':information_source:',
|
||||
'WARNING': ':warning:'
|
||||
}
|
||||
|
||||
emoji = types.get(record.levelname, ':heavy_multiplication_x:')
|
||||
dt = datetime.datetime.utcfromtimestamp(record.created)
|
||||
msg = f'{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {record.message}`'
|
||||
await self.logs.get('gateway').send(msg)
|
||||
|
||||
@command_extra(name='commandstats')
|
||||
@commands.is_owner()
|
||||
async def _commandstats(self, ctx, limit=20):
|
||||
counter = self.bot.command_stats
|
||||
width = len(max(counter, key=len))
|
||||
|
||||
if limit > 0:
|
||||
common = counter.most_common(limit)
|
||||
else:
|
||||
common = counter.most_common()[limit:]
|
||||
|
||||
output = '\n'.join(f'{k:<{width}}: {c}' for k, c in common)
|
||||
|
||||
await ctx.send(f'```\n{output}\n```')
|
||||
|
||||
@commands.command('socketstats')
|
||||
@commands.is_owner()
|
||||
async def _socketstats(self, ctx):
|
||||
delta = datetime.datetime.utcnow() - self.bot.uptime
|
||||
minutes = delta.total_seconds() / 60
|
||||
total = sum(self.bot.socket_stats.values())
|
||||
cpm = total / minutes
|
||||
await ctx.send(
|
||||
f'{total} socket events observed ({cpm:.2f}/minute):\n'
|
||||
f'{self.bot.socket_stats}')
|
||||
|
||||
@commands.command('uptime')
|
||||
async def _uptime(self, ctx):
|
||||
uptime = humanize.naturaltime(
|
||||
datetime.datetime.utcnow() - self.bot.uptime)
|
||||
await ctx.send(f'Uptime: **{uptime}**')
|
||||
|
||||
|
||||
async def on_error(self, event, *args):
|
||||
e = discord.Embed(title='Event Error', colour=0xa32952)
|
||||
e.add_field(name='Event', value=event)
|
||||
e.description = f'```py\n{traceback.format_exc()}\n```'
|
||||
e.timestamp = datetime.datetime.utcnow()
|
||||
|
||||
args_str = ['```py']
|
||||
for index, arg in enumerate(args):
|
||||
args_str.append(f'[{index}]: {arg!r}')
|
||||
args_str.append('```')
|
||||
e.add_field(name='Args', value='\n'.join(args_str), inline=False)
|
||||
|
||||
hook = self.get_cog('Logs').logs.get('errors')
|
||||
try:
|
||||
await hook.send(embed=e)
|
||||
except (discord.HTTPException, discord.NotFound,
|
||||
discord.Forbidden, discord.InvalidArgument):
|
||||
pass
|
5
tuxbot/cogs/network/__init__.py
Normal file
5
tuxbot/cogs/network/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
from .network import Network
|
||||
|
||||
|
||||
def setup(bot):
|
||||
bot.add_cog(Network(bot))
|
105
tuxbot/cogs/network/network.py
Normal file
105
tuxbot/cogs/network/network.py
Normal file
|
@ -0,0 +1,105 @@
|
|||
import logging
|
||||
import socket
|
||||
import ipinfo
|
||||
import discord
|
||||
|
||||
from discord.ext import commands, flags
|
||||
from ipwhois import Net
|
||||
from ipwhois.asn import IPASN
|
||||
from ipinfo.exceptions import RequestQuotaExceededError
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
from app import TuxBot
|
||||
from utils.functions.extra import ContextPlus, command_extra
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Network(commands.Cog, name="Useless"):
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
|
||||
@flags.add_flag("-i", "--ip", type=str, default='v4',
|
||||
choices=['v4', '4', 'v6', '6'])
|
||||
@command_extra(name="iplocalise", aliases=['localiseip'])
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _iplocalise(self, ctx: ContextPlus, target: str, **passed_flags):
|
||||
loading = await ctx.send(
|
||||
"_Récupération des informations..._", deletable=False
|
||||
)
|
||||
|
||||
def get_hostname(dtl, tgt):
|
||||
try:
|
||||
return dtl.hostname
|
||||
except AttributeError:
|
||||
try:
|
||||
return socket.gethostbyaddr(tgt)[0]
|
||||
except (ValueError, socket.herror):
|
||||
return 'N/A'
|
||||
|
||||
ip_type = passed_flags.get('ip')
|
||||
target_copy = target
|
||||
|
||||
# clean https://, last /, ...
|
||||
spltTgt = target.split("://")
|
||||
target = spltTgt[
|
||||
(0, 1)[len(spltTgt) > 1]
|
||||
].split("?")[0].split('/')[0].split(':')[0].lower()
|
||||
|
||||
try:
|
||||
target = socket.getaddrinfo(
|
||||
target, None,
|
||||
socket.AF_INET if ip_type in ['v4', '4'] else socket.AF_INET6
|
||||
)[1][4][0]
|
||||
except socket.gaierror:
|
||||
return \
|
||||
await ctx.send("Erreur, cette adresse n'est pas disponible.")
|
||||
|
||||
net = Net(target)
|
||||
obj = IPASN(net)
|
||||
ip_info = obj.lookup()
|
||||
|
||||
try:
|
||||
handler = ipinfo.getHandler(self.bot._config.ipinfo)
|
||||
details = handler.getDetails(target)
|
||||
api_result = True
|
||||
except (RequestQuotaExceededError, HTTPError):
|
||||
details = None
|
||||
api_result = False
|
||||
|
||||
if api_result:
|
||||
belongs = f"{details.org}"
|
||||
|
||||
osm = f"https://www.openstreetmap.org/" \
|
||||
f"?mlat={details.latitude}" \
|
||||
f"&mlon={details.longitude}" \
|
||||
f"#map=5/{details.latitude}/{details.longitude}" \
|
||||
f"&layers=H"
|
||||
|
||||
region = f"[{details.city} - {details.region} " \
|
||||
f"({details.country})]({osm})"
|
||||
flag = f"https://www.countryflags.io/" \
|
||||
f"{details.country}/shiny/64.png"
|
||||
else:
|
||||
belongs = f"{ip_info['asn_description']} (AS{ip_info['asn']})"
|
||||
region = f"{ip_info['asn_country_code']}"
|
||||
flag = f"https://www.countryflags.io/" \
|
||||
f"{ip_info['asn_country_code']}/shiny/64.png"
|
||||
|
||||
e = discord.Embed(
|
||||
title=f"**Information sur __{target_copy}__ :**"
|
||||
f" `{target}`",
|
||||
color=0x5858d7
|
||||
)
|
||||
|
||||
e.add_field(name="Appartient à :", value=belongs)
|
||||
e.add_field(name="RIR :", value=f"{ip_info['asn_registry']}")
|
||||
e.add_field(name="Region :", value=region)
|
||||
|
||||
e.add_field(name="Nom de l'hôte :",
|
||||
value=get_hostname(details, target), inline=False)
|
||||
|
||||
e.set_thumbnail(url=flag)
|
||||
|
||||
await loading.delete()
|
||||
await ctx.send(embed=e)
|
8
tuxbot/core/__init__.py
Normal file
8
tuxbot/core/__init__.py
Normal file
|
@ -0,0 +1,8 @@
|
|||
from colorama import init
|
||||
|
||||
from .. import __version__, version_info, VersionInfo
|
||||
from .config import Config
|
||||
|
||||
__all__ = ["Config", "__version__", "version_info", "VersionInfo"]
|
||||
|
||||
init()
|
41
tuxbot/core/bot.py
Normal file
41
tuxbot/core/bot.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
from pathlib import Path
|
||||
|
||||
from discord.ext import commands
|
||||
from . import Config
|
||||
|
||||
|
||||
__all__ = ["Tux"]
|
||||
|
||||
|
||||
class Tux(commands.AutoShardedBot):
|
||||
def __init__(self, *args, bot_dir: Path, **kwargs):
|
||||
self._config = Config.register_core(
|
||||
identifier=None,
|
||||
mentionnable=False
|
||||
)
|
||||
self._config.register_global(
|
||||
token=None,
|
||||
prefix=[],
|
||||
owner=None,
|
||||
whitelist=[],
|
||||
blacklist=[],
|
||||
locale="en-US",
|
||||
embeds=True,
|
||||
color=0x6E83D1,
|
||||
description="Tuxbot !",
|
||||
disabled_commands=[]
|
||||
)
|
||||
self._config.register_guild(
|
||||
prefix=[],
|
||||
whitelist=[],
|
||||
blacklist=[],
|
||||
locale="en-US",
|
||||
admin_role=[],
|
||||
mod_role=[],
|
||||
embeds=None,
|
||||
ignored=False,
|
||||
disabled_commands=[]
|
||||
)
|
||||
self._config.register_channel(
|
||||
ignored=False
|
||||
)
|
41
tuxbot/core/config.py
Normal file
41
tuxbot/core/config.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
from pathlib import Path
|
||||
from typing import Any, NoReturn
|
||||
|
||||
|
||||
class Config:
|
||||
GLOBAL = "GLOBAL"
|
||||
GUILD = "GUILD"
|
||||
CHANNEL = "TEXT_CHANNEL"
|
||||
ROLE = "ROLE"
|
||||
MEMBER = "MEMBER"
|
||||
USER = "USER"
|
||||
|
||||
def __init__(self, config_dir: Path):
|
||||
self._defaults = {}
|
||||
|
||||
def __getattr__(self, item: str) -> dict:
|
||||
return getattr(self._defaults, item)
|
||||
|
||||
def _register_default(self, key: str, **kwargs: Any):
|
||||
...
|
||||
|
||||
def register_core(self, **kwargs) -> NoReturn:
|
||||
self._register_default(self.GUILD, **kwargs)
|
||||
|
||||
def register_global(self, **kwargs) -> NoReturn:
|
||||
self._register_default(self.GLOBAL, **kwargs)
|
||||
|
||||
def register_guild(self, **kwargs) -> NoReturn:
|
||||
self._register_default(self.GUILD, **kwargs)
|
||||
|
||||
def register_channel(self, **kwargs) -> NoReturn:
|
||||
self._register_default(self.CHANNEL, **kwargs)
|
||||
|
||||
def register_role(self, **kwargs) -> NoReturn:
|
||||
self._register_default(self.ROLE, **kwargs)
|
||||
|
||||
def register_member(self, **kwargs) -> NoReturn:
|
||||
self._register_default(self.MEMBER, **kwargs)
|
||||
|
||||
def register_user(self, **kwargs) -> NoReturn:
|
||||
self._register_default(self.USER, **kwargs)
|
0
tuxbot/core/models/__init__.py
Normal file
0
tuxbot/core/models/__init__.py
Normal file
89
tuxbot/core/utils/functions/cli.py
Normal file
89
tuxbot/core/utils/functions/cli.py
Normal file
|
@ -0,0 +1,89 @@
|
|||
import codecs
|
||||
import itertools
|
||||
import sys
|
||||
|
||||
|
||||
def bordered(*columns: dict) -> str:
|
||||
"""
|
||||
credits to https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/develop/redbot/core/utils/chat_formatting.py
|
||||
|
||||
Get two blocks of text in a borders.
|
||||
|
||||
Note
|
||||
----
|
||||
This will only work with a monospaced font.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*columns : `sequence` of `str`
|
||||
The columns of text, each being a list of lines in that column.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The bordered text.
|
||||
|
||||
"""
|
||||
encoder = codecs.getencoder(sys.stdout.encoding)
|
||||
try:
|
||||
encoder("┌┐└┘─│") # border symbols
|
||||
except UnicodeEncodeError:
|
||||
ascii_border = True
|
||||
else:
|
||||
ascii_border = False
|
||||
|
||||
borders = {
|
||||
"TL": "+" if ascii_border else "┌", # Top-left
|
||||
"TR": "+" if ascii_border else "┐", # Top-right
|
||||
"BL": "+" if ascii_border else "└", # Bottom-left
|
||||
"BR": "+" if ascii_border else "┘", # Bottom-right
|
||||
"HZ": "-" if ascii_border else "─", # Horizontal
|
||||
"VT": "|" if ascii_border else "│", # Vertical
|
||||
}
|
||||
|
||||
sep = " " * 4 # Separator between boxes
|
||||
widths = tuple(
|
||||
max(
|
||||
len(row) for row in column.get('rows')
|
||||
) + 9
|
||||
for column in columns
|
||||
) # width of each col
|
||||
cols_done = [False] * len(columns) # whether or not each column is done
|
||||
lines = [""]
|
||||
|
||||
for i, column in enumerate(columns):
|
||||
lines[0] += "{TL}" + "{HZ}" + column.get('title') \
|
||||
+ "{HZ}" * (widths[i] - len(column.get('title')) - 1) \
|
||||
+ "{TR}" + sep
|
||||
|
||||
for line in itertools.zip_longest(
|
||||
*[column.get('rows') for column in columns]
|
||||
):
|
||||
row = []
|
||||
for colidx, column in enumerate(line):
|
||||
width = widths[colidx]
|
||||
done = cols_done[colidx]
|
||||
if column is None:
|
||||
if not done:
|
||||
# bottom border of column
|
||||
column = "{HZ}" * width
|
||||
row.append("{BL}" + column + "{BR}")
|
||||
cols_done[colidx] = True # mark column as done
|
||||
else:
|
||||
# leave empty
|
||||
row.append(" " * (width + 2))
|
||||
else:
|
||||
column += " " * (width - len(column)) # append padded spaces
|
||||
row.append("{VT}" + column + "{VT}")
|
||||
|
||||
lines.append(sep.join(row))
|
||||
|
||||
final_row = []
|
||||
for width, done in zip(widths, cols_done):
|
||||
if not done:
|
||||
final_row.append("{BL}" + "{HZ}" * width + "{BR}")
|
||||
else:
|
||||
final_row.append(" " * (width + 2))
|
||||
lines.append(sep.join(final_row))
|
||||
|
||||
return "\n".join(lines).format(**borders)
|
114
tuxbot/core/utils/functions/extra.py
Normal file
114
tuxbot/core/utils/functions/extra.py
Normal file
|
@ -0,0 +1,114 @@
|
|||
import ast
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
|
||||
import discord
|
||||
from discord.ext import commands, flags
|
||||
|
||||
from configs.bot.protected import protected
|
||||
from configs.bot.settings import prefixes
|
||||
|
||||
|
||||
class ContextPlus(commands.Context):
|
||||
async def send(self, content=None, *args, **kwargs):
|
||||
if content is not None:
|
||||
for value in protected:
|
||||
content = content.replace(
|
||||
str(value),
|
||||
'[Deleted]'
|
||||
)
|
||||
|
||||
if kwargs.get('content') is not None:
|
||||
for value in protected:
|
||||
kwargs['content'] = kwargs['content'].replace(
|
||||
str(value),
|
||||
'[Deleted]'
|
||||
)
|
||||
|
||||
if kwargs.get('embeds') is not None and len(kwargs.get('embeds')) > 0:
|
||||
for i, embed in enumerate(kwargs.get('embeds')):
|
||||
embed = str(kwargs.get('embed').to_dict())
|
||||
for value in protected:
|
||||
embed = embed.replace(str(value), '[Deleted]')
|
||||
kwargs['embeds'][i] = discord.Embed.from_dict(
|
||||
ast.literal_eval(embed)
|
||||
)
|
||||
|
||||
if kwargs.get('embed') is not None:
|
||||
embed = str(kwargs.get('embed').to_dict())
|
||||
for value in protected:
|
||||
embed = embed.replace(str(value), '[Deleted]')
|
||||
kwargs['embed'] = discord.Embed.from_dict(
|
||||
ast.literal_eval(embed)
|
||||
)
|
||||
|
||||
if (hasattr(self.command, 'deletable') and self.command.deletable) \
|
||||
and kwargs.pop('deletable', True):
|
||||
message = await super().send(content, *args, **kwargs)
|
||||
await message.add_reaction('🗑')
|
||||
|
||||
def check(reaction: discord.Reaction, user: discord.User):
|
||||
return user == self.author \
|
||||
and str(reaction.emoji) == '🗑' \
|
||||
and reaction.message.id == message.id
|
||||
|
||||
try:
|
||||
await self.bot.wait_for(
|
||||
'reaction_add',
|
||||
timeout=60.0,
|
||||
check=check
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
await message.remove_reaction('🗑', self.bot.user)
|
||||
else:
|
||||
await message.delete()
|
||||
return message
|
||||
else:
|
||||
return await super().send(content, *args, **kwargs)
|
||||
|
||||
|
||||
class CommandPLus(flags.FlagCommand):
|
||||
def __init__(self, function, **kwargs):
|
||||
super().__init__(function, **kwargs)
|
||||
self.deletable = kwargs.pop("deletable", True)
|
||||
|
||||
|
||||
def command_extra(*args, **kwargs):
|
||||
return commands.command(*args, **kwargs, cls=CommandPLus)
|
||||
|
||||
|
||||
class GroupPlus(flags.FlagGroup):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.deletable = kwargs.pop("deletable", True)
|
||||
|
||||
|
||||
def group_extra(*args, **kwargs):
|
||||
return commands.group(*args, **kwargs, cls=GroupPlus)
|
||||
|
||||
|
||||
async def get_prefix(bot, message):
|
||||
custom_prefix = prefixes
|
||||
if message.guild:
|
||||
path = f"configs/guilds/{str(message.guild.id)}.json"
|
||||
|
||||
if os.path.exists(path):
|
||||
with open(path) as f:
|
||||
datas = json.load(f)
|
||||
|
||||
custom_prefix = datas["Prefix"]
|
||||
|
||||
return commands.when_mentioned_or(*custom_prefix)(bot, message)
|
||||
|
||||
|
||||
def get_owners() -> list:
|
||||
with open("configs/bot/whitelist.json") as f:
|
||||
datas = json.load(f)
|
||||
|
||||
return datas['owners']
|
||||
|
||||
|
||||
def get_blacklist() -> dict:
|
||||
with open("configs/bot/blacklist.json") as f:
|
||||
return json.load(f)
|
2
tuxbot/setup.py
Normal file
2
tuxbot/setup.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
def setup():
|
||||
...
|
Loading…
Add table
Add a link
Reference in a new issue