first commit
This commit is contained in:
parent
7eac932a4e
commit
04645ec639
76 changed files with 792 additions and 4532 deletions
56
cogs/API.py
56
cogs/API.py
|
@ -1,56 +0,0 @@
|
|||
import logging
|
||||
|
||||
import discord
|
||||
from aiohttp import web
|
||||
from discord.ext import commands
|
||||
|
||||
from bot import TuxBot
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class API(commands.Cog):
|
||||
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
self.site = web.TCPSite
|
||||
|
||||
app = web.Application()
|
||||
app.add_routes([web.get('/users/{user_id}', self.users)])
|
||||
|
||||
self.runner = web.AppRunner(app)
|
||||
self.bot.loop.create_task(self.start_HTTPMonitoring_server())
|
||||
|
||||
async def start_HTTPMonitoring_server(self):
|
||||
host = self.bot.config.get('API', 'Host')
|
||||
port = self.bot.config.get('API', 'Port')
|
||||
|
||||
print(f"Starting API server on {host}:{port}")
|
||||
|
||||
await self.runner.setup()
|
||||
self.site = web.TCPSite(self.runner, host, port)
|
||||
await self.site.start()
|
||||
|
||||
async def users(self, request):
|
||||
try:
|
||||
user = await self.bot.fetch_user(request.match_info['user_id'])
|
||||
except discord.NotFound:
|
||||
return web.Response(status=404)
|
||||
|
||||
json = {
|
||||
'id': user.id,
|
||||
'username': user.name,
|
||||
'discriminator': user.discriminator,
|
||||
'avatar': user.avatar,
|
||||
'default_avatar': user.default_avatar.value,
|
||||
'bot': user.bot,
|
||||
'system': user.system,
|
||||
}
|
||||
|
||||
return web.json_response(
|
||||
json
|
||||
)
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
bot.add_cog(API(bot))
|
534
cogs/Admin.py
534
cogs/Admin.py
|
@ -1,534 +0,0 @@
|
|||
import asyncio
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
import discord
|
||||
import humanize
|
||||
from discord.ext import commands
|
||||
|
||||
from bot import TuxBot
|
||||
from utils import Texts
|
||||
from utils.models import WarnModel
|
||||
from utils import command_extra, group_extra
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Admin(commands.Cog):
|
||||
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
self.icon = ":shield:"
|
||||
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/160/twitter/233/shield_1f6e1.png"
|
||||
|
||||
async def cog_check(self, ctx: commands.Context) -> bool:
|
||||
permissions: discord.Permissions = ctx.channel.permissions_for(
|
||||
ctx.author)
|
||||
|
||||
has_permission = permissions.administrator
|
||||
is_owner = await self.bot.is_owner(ctx.author)
|
||||
|
||||
return has_permission or is_owner
|
||||
|
||||
@staticmethod
|
||||
async def kick_ban_message(ctx: commands.Context,
|
||||
**kwargs) -> discord.Embed:
|
||||
member: discord.Member = kwargs.get('member')
|
||||
reason = kwargs.get(
|
||||
'reason',
|
||||
Texts('admin', ctx).get("Please enter a reason")
|
||||
)
|
||||
|
||||
if kwargs.get('type') == 'ban':
|
||||
title = '**Ban** ' + str(len(await ctx.guild.bans()))
|
||||
color = discord.Color.dark_red()
|
||||
else:
|
||||
title = '**Kick**'
|
||||
color = discord.Color.red()
|
||||
e = discord.Embed(
|
||||
title=title,
|
||||
description=reason,
|
||||
timestamp=datetime.datetime.utcnow(),
|
||||
color=color
|
||||
)
|
||||
e.set_author(
|
||||
name=f'{member.name}#{member.discriminator} ({member.id})',
|
||||
icon_url=member.avatar_url_as(format='jpg')
|
||||
)
|
||||
e.set_footer(
|
||||
text=f'{ctx.author.name}#{ctx.author.discriminator}',
|
||||
icon_url=ctx.author.avatar_url_as(format='png')
|
||||
)
|
||||
|
||||
return e
|
||||
|
||||
###########################################################################
|
||||
|
||||
@group_extra(name='say', invoke_without_command=True, category='text')
|
||||
async def _say(self, ctx: commands.Context, *, content: str):
|
||||
if ctx.invoked_subcommand is None:
|
||||
try:
|
||||
await ctx.message.delete()
|
||||
except discord.errors.Forbidden:
|
||||
pass
|
||||
|
||||
await ctx.send(content)
|
||||
|
||||
@_say.command(name='edit')
|
||||
async def _say_edit(self, ctx: commands.Context, message_id: int, *,
|
||||
content: str):
|
||||
try:
|
||||
await ctx.message.delete()
|
||||
except discord.errors.Forbidden:
|
||||
pass
|
||||
|
||||
try:
|
||||
message: discord.Message = await ctx.channel.fetch_message(
|
||||
message_id)
|
||||
await message.edit(content=content)
|
||||
except (discord.errors.NotFound, discord.errors.Forbidden):
|
||||
await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the message"),
|
||||
delete_after=5)
|
||||
|
||||
@_say.command(name='to')
|
||||
async def _say_to(self, ctx: commands.Context,
|
||||
channel: Union[discord.TextChannel, discord.User], *,
|
||||
content):
|
||||
try:
|
||||
await ctx.message.delete()
|
||||
except discord.errors.Forbidden:
|
||||
pass
|
||||
|
||||
await channel.send(content)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='ban', category='administration')
|
||||
async def _ban(self, ctx: commands.Context, user: discord.Member, *,
|
||||
reason=""):
|
||||
try:
|
||||
member: discord.Member = await ctx.guild.fetch_member(user.id)
|
||||
|
||||
try:
|
||||
await member.ban(reason=reason)
|
||||
e: discord.Embed = await self.kick_ban_message(
|
||||
ctx,
|
||||
member=member,
|
||||
type='ban',
|
||||
reason=reason
|
||||
)
|
||||
|
||||
await ctx.send(embed=e)
|
||||
except discord.Forbidden:
|
||||
await ctx.send(
|
||||
Texts('admin', ctx).get("Unable to ban this user"),
|
||||
delete_after=5)
|
||||
except discord.errors.NotFound:
|
||||
await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the user..."),
|
||||
delete_after=5)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='kick', category='administration')
|
||||
async def _kick(self, ctx: commands.Context, user: discord.Member, *,
|
||||
reason=""):
|
||||
try:
|
||||
member: discord.Member = await ctx.guild.fetch_member(user.id)
|
||||
|
||||
try:
|
||||
await member.kick(reason=reason)
|
||||
e: discord.Embed = await self.kick_ban_message(
|
||||
ctx,
|
||||
member=member,
|
||||
type='kick',
|
||||
reason=reason
|
||||
)
|
||||
|
||||
await ctx.send(embed=e)
|
||||
except discord.Forbidden:
|
||||
await ctx.send(
|
||||
Texts('admin', ctx).get("Unable to kick this user"),
|
||||
delete_after=5)
|
||||
except discord.errors.NotFound:
|
||||
await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the user..."),
|
||||
delete_after=5)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='clear', category='text')
|
||||
async def _clear(self, ctx: commands.Context, count: int):
|
||||
try:
|
||||
await ctx.message.delete()
|
||||
await ctx.channel.purge(limit=count)
|
||||
except discord.errors.Forbidden:
|
||||
pass
|
||||
|
||||
###########################################################################
|
||||
|
||||
@group_extra(name='react', category='text')
|
||||
async def _react(self, ctx: commands.Context):
|
||||
if ctx.invoked_subcommand is None:
|
||||
await ctx.send_help('react')
|
||||
|
||||
@_react.command(name='add')
|
||||
async def _react_add(self, ctx: commands.Context, message_id: int, *,
|
||||
emojis: str):
|
||||
emojis: list = emojis.split(' ')
|
||||
|
||||
try:
|
||||
message: discord.Message = await ctx.channel.fetch_message(
|
||||
message_id)
|
||||
|
||||
for emoji in emojis:
|
||||
await message.add_reaction(emoji)
|
||||
except discord.errors.NotFound:
|
||||
await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the message"),
|
||||
delete_after=5)
|
||||
|
||||
@_react.command(name='remove', aliases=['clear'])
|
||||
async def _react_remove(self, ctx: commands.Context, message_id: int):
|
||||
try:
|
||||
message: discord.Message = await ctx.channel.fetch_message(
|
||||
message_id)
|
||||
await message.clear_reactions()
|
||||
except discord.errors.NotFound:
|
||||
await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the message"),
|
||||
delete_after=5)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@group_extra(name='delete', invoke_without_command=True, category='text')
|
||||
async def _delete(self, ctx: commands.Context, message_id: int):
|
||||
try:
|
||||
await ctx.message.delete()
|
||||
except discord.errors.Forbidden:
|
||||
pass
|
||||
|
||||
try:
|
||||
message: discord.Message = await ctx.channel.fetch_message(
|
||||
message_id)
|
||||
await message.delete()
|
||||
except (discord.errors.NotFound, discord.errors.Forbidden):
|
||||
await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the message"),
|
||||
delete_after=5)
|
||||
|
||||
@_delete.command(name='from', aliases=['to', 'in'])
|
||||
async def _delete_from(self, ctx: commands.Context,
|
||||
channel: discord.TextChannel, message_id: int):
|
||||
try:
|
||||
await ctx.message.delete()
|
||||
except discord.errors.Forbidden:
|
||||
pass
|
||||
|
||||
try:
|
||||
message: discord.Message = await channel.fetch_message(
|
||||
message_id
|
||||
)
|
||||
await message.delete()
|
||||
except (discord.errors.NotFound, discord.errors.Forbidden):
|
||||
await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the message"),
|
||||
delete_after=5
|
||||
)
|
||||
|
||||
###########################################################################
|
||||
|
||||
async def get_warn(self, ctx: commands.Context,
|
||||
member: discord.Member = False):
|
||||
await ctx.trigger_typing()
|
||||
|
||||
if member:
|
||||
warns = WarnModel.objects.filter(
|
||||
server_id=str(ctx.guild.id),
|
||||
user_id=member.id
|
||||
)
|
||||
else:
|
||||
warns = WarnModel.objects.filter(
|
||||
server_id=str(ctx.guild.id)
|
||||
)
|
||||
warns_list = ''
|
||||
|
||||
for warn in await warns.all():
|
||||
row_id = warn.id
|
||||
user_id = warn.user_id
|
||||
user = await self.bot.fetch_user(user_id)
|
||||
reason = warn.reason
|
||||
ago = humanize.naturaldelta(
|
||||
datetime.datetime.now() - warn.created_at
|
||||
)
|
||||
|
||||
warns_list += f"[{row_id}] **{user}**: `{reason}` *({ago} ago)*\n"
|
||||
|
||||
return warns_list, warns
|
||||
|
||||
async def add_warn(self, ctx: commands.Context, member: discord.Member,
|
||||
reason):
|
||||
|
||||
now = datetime.datetime.now()
|
||||
warn = WarnModel(server_id=ctx.guild.id, user_id=member.id,
|
||||
reason=reason,
|
||||
created_at=now)
|
||||
|
||||
self.bot.database.session.add(warn)
|
||||
self.bot.database.session.commit()
|
||||
|
||||
@group_extra(name='warn', aliases=['warns'], category='administration')
|
||||
async def _warn(self, ctx: commands.Context):
|
||||
await ctx.trigger_typing()
|
||||
if ctx.invoked_subcommand is None:
|
||||
warns_list, warns = await self.get_warn(ctx)
|
||||
e = discord.Embed(
|
||||
title=f"{warns.count()} {Texts('admin', ctx).get('last warns')}: ",
|
||||
description=warns_list
|
||||
)
|
||||
|
||||
await ctx.send(embed=e)
|
||||
|
||||
@_warn.command(name='add', aliases=['new'])
|
||||
async def _warn_add(self, ctx: commands.Context, member: discord.Member,
|
||||
*, reason="N/A"):
|
||||
if not member:
|
||||
return await ctx.send(
|
||||
Texts('utils', ctx).get("Unable to find the user...")
|
||||
)
|
||||
|
||||
def check(pld: discord.RawReactionActionEvent):
|
||||
if pld.message_id != choice.id \
|
||||
or pld.user_id != ctx.author.id:
|
||||
return False
|
||||
return pld.emoji.name in ('1⃣', '2⃣', '3⃣')
|
||||
|
||||
warns_list, warns = await self.get_warn(ctx, member)
|
||||
|
||||
if warns.count() >= 2:
|
||||
e = discord.Embed(
|
||||
title=Texts('admin', ctx).get('More than 2 warns'),
|
||||
description=f"{member.mention} "
|
||||
+ Texts('admin', ctx).get('has more than 2 warns')
|
||||
)
|
||||
e.add_field(
|
||||
name='__Actions__',
|
||||
value=':one: kick\n'
|
||||
':two: ban\n'
|
||||
':three: ' + Texts('admin', ctx).get('ignore')
|
||||
)
|
||||
|
||||
choice = await ctx.send(embed=e)
|
||||
|
||||
for reaction in ('1⃣', '2⃣', '3⃣'):
|
||||
await choice.add_reaction(reaction)
|
||||
|
||||
try:
|
||||
payload = await self.bot.wait_for(
|
||||
'raw_reaction_add',
|
||||
check=check,
|
||||
timeout=50.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
return await ctx.send(
|
||||
Texts('admin', ctx).get('Took too long. Aborting.')
|
||||
)
|
||||
finally:
|
||||
await choice.delete()
|
||||
|
||||
if payload.emoji.name == '1⃣':
|
||||
from jishaku.models import copy_context_with
|
||||
|
||||
alt_ctx = await copy_context_with(
|
||||
ctx,
|
||||
content=f"{ctx.prefix}"
|
||||
f"kick "
|
||||
f"{member} "
|
||||
f"{Texts('admin', ctx).get('More than 2 warns')}"
|
||||
)
|
||||
return await alt_ctx.command.invoke(alt_ctx)
|
||||
|
||||
elif payload.emoji.name == '2⃣':
|
||||
from jishaku.models import copy_context_with
|
||||
|
||||
alt_ctx = await copy_context_with(
|
||||
ctx,
|
||||
content=f"{ctx.prefix}"
|
||||
f"ban "
|
||||
f"{member} "
|
||||
f"{Texts('admin', ctx).get('More than 2 warns')}"
|
||||
)
|
||||
return await alt_ctx.command.invoke(alt_ctx)
|
||||
|
||||
await self.add_warn(ctx, member, reason)
|
||||
await ctx.send(
|
||||
content=f"{member.mention} "
|
||||
f"**{Texts('admin', ctx).get('got a warn')}**"
|
||||
f"\n**{Texts('admin', ctx).get('Reason')}:** `{reason}`"
|
||||
)
|
||||
|
||||
@_warn.command(name='remove', aliases=['revoke', 'del', 'delete'])
|
||||
async def _warn_remove(self, ctx: commands.Context, warn_id: int):
|
||||
warn = self.bot.database.session \
|
||||
.query(WarnModel) \
|
||||
.filter(WarnModel.id == warn_id) \
|
||||
.one()
|
||||
|
||||
self.bot.database.session.delete(warn)
|
||||
|
||||
await ctx.send(f"{Texts('admin', ctx).get('Warn with id')} `{warn_id}`"
|
||||
f" {Texts('admin', ctx).get('successfully removed')}")
|
||||
|
||||
@_warn.command(name='show', aliases=['list', 'all'])
|
||||
async def _warn_show(self, ctx: commands.Context, member: discord.Member):
|
||||
warns_list, warns = await self.get_warn(ctx, member)
|
||||
|
||||
e = discord.Embed(
|
||||
title=f"{warns.count()} {Texts('admin', ctx).get('last warns')}: ",
|
||||
description=warns_list
|
||||
)
|
||||
|
||||
await ctx.send(embed=e)
|
||||
|
||||
@_warn.command(name='edit', aliases=['change', 'modify'])
|
||||
async def _warn_edit(self, ctx: commands.Context, warn_id: int, *, reason):
|
||||
warn = self.bot.database.session \
|
||||
.query(WarnModel) \
|
||||
.filter(WarnModel.id == warn_id) \
|
||||
.one()
|
||||
warn.reason = reason
|
||||
|
||||
self.bot.database.session.commit()
|
||||
|
||||
await ctx.send(f"{Texts('admin', ctx).get('Warn with id')} `{warn_id}`"
|
||||
f" {Texts('admin', ctx).get('successfully edited')}")
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='language', aliases=['lang', 'langue', 'langage'], category='server')
|
||||
async def _language(self, ctx: commands.Context, locale: str):
|
||||
available = self.bot.database.session \
|
||||
.query(LangModel.value) \
|
||||
.filter(LangModel.key == 'available') \
|
||||
.first()[0] \
|
||||
.split(',')
|
||||
|
||||
if locale.lower() not in available:
|
||||
await ctx.send(
|
||||
Texts('admin', ctx).get('Unable to find this language'))
|
||||
else:
|
||||
current = self.bot.database.session \
|
||||
.query(LangModel) \
|
||||
.filter(LangModel.key == str(ctx.guild.id))
|
||||
|
||||
if current.count() > 0:
|
||||
current = current.one()
|
||||
current.value = locale.lower()
|
||||
self.bot.database.session.commit()
|
||||
else:
|
||||
new_row = LangModel(key=str(ctx.guild.id),
|
||||
value=locale.lower())
|
||||
self.bot.database.session.add(new_row)
|
||||
self.bot.database.session.commit()
|
||||
|
||||
await ctx.send(
|
||||
Texts('admin', ctx).get('Language changed successfully'))
|
||||
|
||||
###########################################################################
|
||||
|
||||
@group_extra(name='prefix', aliases=['prefixes'], category='server')
|
||||
async def _prefix(self, ctx: commands.Context):
|
||||
if ctx.invoked_subcommand is None:
|
||||
await ctx.send_help('prefix')
|
||||
|
||||
@_prefix.command(name='add', aliases=['set', 'new'])
|
||||
async def _prefix_add(self, ctx: commands.Context, prefix: str):
|
||||
if str(ctx.guild.id) in self.bot.prefixes:
|
||||
prefixes = self.bot.prefixes.get(
|
||||
str(ctx.guild.id), "prefixes"
|
||||
).split(
|
||||
self.bot.config.get("misc", "separator")
|
||||
)
|
||||
|
||||
if prefix in prefixes:
|
||||
return await ctx.send(
|
||||
Texts('admin', ctx).get('This prefix already exists')
|
||||
)
|
||||
else:
|
||||
prefixes.append(prefix)
|
||||
self.bot.prefixes.set(
|
||||
str(ctx.guild.id),
|
||||
"prefixes",
|
||||
self.bot.config.get("misc", "separator")
|
||||
.join(prefixes)
|
||||
)
|
||||
with open('./configs/prefixes.cfg', 'w') as configfile:
|
||||
self.bot.prefixes.write(configfile)
|
||||
else:
|
||||
self.bot.prefixes.add_section(str(ctx.guild.id))
|
||||
self.bot.prefixes.set(str(ctx.guild.id), "prefixes", prefix)
|
||||
with open('./configs/prefixes.cfg', 'w') as configfile:
|
||||
self.bot.prefixes.write(configfile)
|
||||
|
||||
await ctx.send(
|
||||
Texts('admin', ctx).get('Prefix added successfully')
|
||||
)
|
||||
|
||||
@_prefix.command(name='remove', aliases=['drop', 'del', 'delete'])
|
||||
async def _prefix_remove(self, ctx: commands.Context, prefix: str):
|
||||
if str(ctx.guild.id) in self.bot.prefixes:
|
||||
prefixes = self.bot.prefixes.get(
|
||||
str(ctx.guild.id), "prefixes"
|
||||
).split(
|
||||
self.bot.config.get("misc", "separator")
|
||||
)
|
||||
|
||||
if prefix in prefixes:
|
||||
prefixes.remove(prefix)
|
||||
self.bot.prefixes.set(
|
||||
str(ctx.guild.id),
|
||||
"prefixes",
|
||||
self.bot.config.get("misc", "separator")
|
||||
.join(prefixes)
|
||||
)
|
||||
with open('./configs/prefixes.cfg', 'w') as configfile:
|
||||
self.bot.prefixes.write(configfile)
|
||||
|
||||
return await ctx.send(
|
||||
Texts('admin', ctx).get('Prefix removed successfully')
|
||||
)
|
||||
|
||||
await ctx.send(
|
||||
Texts('admin', ctx).get('This prefix does not exist')
|
||||
)
|
||||
|
||||
@_prefix.command(name='list', aliases=['show', 'all'])
|
||||
async def _prefix_list(self, ctx: commands.Context):
|
||||
extras = ['.']
|
||||
if ctx.message.guild is not None:
|
||||
extras = []
|
||||
if str(ctx.message.guild.id) in self.bot.prefixes:
|
||||
extras.extend(
|
||||
self.bot.prefixes.get(str(ctx.message.guild.id),
|
||||
"prefixes").split(
|
||||
self.bot.config.get("misc", "separator")
|
||||
)
|
||||
)
|
||||
|
||||
prefixes = [self.bot.user.mention]
|
||||
prefixes.extend(extras)
|
||||
|
||||
if len(prefixes) <= 1:
|
||||
text = Texts('admin', ctx) \
|
||||
.get('The only prefix for this guild is :\n')
|
||||
else:
|
||||
text = Texts('admin', ctx) \
|
||||
.get('Available prefixes for this guild are :\n')
|
||||
|
||||
await ctx.send(text + "\n • ".join(prefixes))
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
bot.add_cog(Admin(bot))
|
227
cogs/Help.py
227
cogs/Help.py
|
@ -1,227 +0,0 @@
|
|||
# Created by romain at 04/01/2020
|
||||
|
||||
import logging
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
from bot import TuxBot
|
||||
from utils import Texts, GroupPlus
|
||||
from utils import FieldPages
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HelpCommand(commands.HelpCommand):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.ignore_cogs = ["Monitoring", "Help", "Jishaku"]
|
||||
self.owner_cogs = []
|
||||
self.admin_cogs = ["Admin"]
|
||||
|
||||
def command_formatting(self, e, command):
|
||||
prefix = self.context.prefix \
|
||||
if str(self.context.bot.user.id) not in self.context.prefix \
|
||||
else f"@{self.context.bot.user.name}"
|
||||
file = Texts(command.cog.qualified_name.lower() + '_help', self.context)
|
||||
|
||||
if command.parent is not None:
|
||||
description = file.get(f"_{command.parent}_{command.name}")
|
||||
usage = file.get(f"_{command.parent}_{command.name}__usage")
|
||||
else:
|
||||
description = file.get(f"_{command.name}")
|
||||
usage = file.get(f"_{command.name}__usage")
|
||||
|
||||
e.title = self.get_command_signature(command) + usage
|
||||
e.description = description
|
||||
|
||||
e.add_field(
|
||||
name=Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'command_help.params'
|
||||
),
|
||||
value=usage
|
||||
)
|
||||
e.add_field(
|
||||
name=Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'command_help.usage'
|
||||
),
|
||||
value=f"{prefix}{command.qualified_name} " + usage
|
||||
)
|
||||
|
||||
aliases = "`" + '`, `'.join(command.aliases) + "`"
|
||||
if aliases == "``":
|
||||
aliases = Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'command_help.no_aliases'
|
||||
)
|
||||
e.add_field(
|
||||
name=Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'command_help.aliases'
|
||||
),
|
||||
value=aliases
|
||||
)
|
||||
|
||||
return e
|
||||
|
||||
async def send_bot_help(self, mapping):
|
||||
owners = self.context.bot.owners
|
||||
prefix = self.context.prefix \
|
||||
if str(self.context.bot.user.id) not in self.context.prefix \
|
||||
else f"@{self.context.bot.user.name} "
|
||||
|
||||
e = discord.Embed(
|
||||
color=discord.Color.blue(),
|
||||
description=Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'main_page.description'
|
||||
)
|
||||
)
|
||||
e.set_author(
|
||||
icon_url=self.context.author.avatar_url_as(format='png'),
|
||||
name=self.context.author
|
||||
)
|
||||
e.set_footer(
|
||||
text=Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'main_page.footer'
|
||||
).format(
|
||||
prefix
|
||||
)
|
||||
)
|
||||
|
||||
for extension in self.context.bot.cogs.values():
|
||||
if self.context.author not in owners \
|
||||
and extension.__class__.__name__ in self.owner_cogs:
|
||||
continue
|
||||
if extension.__class__.__name__ in self.ignore_cogs:
|
||||
continue
|
||||
|
||||
count = len(extension.get_commands())
|
||||
text = Texts('help', self.context).get('main_page.commands')
|
||||
|
||||
if count <= 1:
|
||||
text = text[:-1]
|
||||
|
||||
e.add_field(
|
||||
name=f"__{extension.icon} **{extension.qualified_name}**__",
|
||||
value=f"{count} {text}"
|
||||
)
|
||||
|
||||
await self.context.send(embed=e)
|
||||
|
||||
async def send_cog_help(self, cog):
|
||||
pages = {}
|
||||
prefix = self.context.prefix \
|
||||
if str(self.context.bot.user.id) not in self.context.prefix \
|
||||
else f"@{self.context.bot.user.name}"
|
||||
file = Texts(cog.qualified_name.lower() + '_help', self.context)
|
||||
|
||||
if cog.__class__.__name__ in self.owner_cogs \
|
||||
and self.context.author not in self.context.bot.owners:
|
||||
return self.command_not_found(cog.qualified_name)
|
||||
|
||||
for cmd in cog.get_commands():
|
||||
if self.context.author not in self.context.bot.owners \
|
||||
and (cmd.hidden or cmd.category == "Hidden"):
|
||||
continue
|
||||
|
||||
if cmd.category not in pages:
|
||||
pages[cmd.category] = "```asciidoc\n"
|
||||
|
||||
pages[cmd.category] \
|
||||
+= f"{cmd.name}" \
|
||||
+ ' ' * int(13 - len(cmd.name)) \
|
||||
+ f":: {file.get(f'_{cmd.name}__short')}\n"
|
||||
|
||||
if isinstance(cmd, GroupPlus):
|
||||
for group_command in cmd.commands:
|
||||
pages[cmd.category] \
|
||||
+= f"└> {group_command.name}" \
|
||||
+ ' ' * int(10 - len(group_command.name)) \
|
||||
+ f":: {file.get(f'_{group_command.parent}_{group_command.name}__short')}\n"
|
||||
for e in pages:
|
||||
pages[e] += "```"
|
||||
formatted = []
|
||||
for name, cont in pages.items():
|
||||
formatted.append((name, cont))
|
||||
footer_text = Texts('help', self.context) \
|
||||
.get('main_page.footer') \
|
||||
.format(prefix)
|
||||
|
||||
pages = FieldPages(
|
||||
self.context,
|
||||
embed_color=discord.Color.blue(),
|
||||
entries=formatted,
|
||||
title=cog.qualified_name.upper(),
|
||||
thumbnail=cog.big_icon,
|
||||
footericon=self.context.bot.user.avatar_url,
|
||||
footertext=footer_text,
|
||||
per_page=1
|
||||
)
|
||||
await pages.paginate()
|
||||
|
||||
async def send_group_help(self, group):
|
||||
if group.cog_name in self.ignore_cogs:
|
||||
return await self.send_error_message(
|
||||
self.command_not_found(group.name)
|
||||
)
|
||||
file = Texts(group.qualified_name.lower() + '_help', self.context)
|
||||
|
||||
formatted = self.command_formatting(
|
||||
discord.Embed(color=discord.Color.blue()),
|
||||
group
|
||||
)
|
||||
sub_command_list = "⠀" # this is braille, please don't touch unless you know what you're doing
|
||||
for group_command in group.commands:
|
||||
sub_command_list += f"└> **{group_command.name}** - {file.get(f'_{group_command.parent}_{group_command.name}')}\n"
|
||||
|
||||
subcommands = Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'command_help.subcommands'
|
||||
)
|
||||
|
||||
formatted.add_field(name=subcommands, value=sub_command_list, inline=False)
|
||||
await self.context.send(embed=formatted)
|
||||
|
||||
async def send_command_help(self, command):
|
||||
if isinstance(command, commands.Group):
|
||||
return await self.send_group_help(command)
|
||||
|
||||
if command.cog_name in self.ignore_cogs:
|
||||
return await self.send_error_message(
|
||||
self.command_not_found(command.name))
|
||||
|
||||
formatted = self.command_formatting(
|
||||
discord.Embed(color=discord.Color.blue()),
|
||||
command
|
||||
)
|
||||
|
||||
await self.context.send(embed=formatted)
|
||||
|
||||
def command_not_found(self, command):
|
||||
return Texts(
|
||||
'help', self.context
|
||||
).get(
|
||||
'main_page.not_found'
|
||||
).format(
|
||||
command
|
||||
)
|
||||
|
||||
|
||||
class Help(commands.Cog):
|
||||
def __init__(self, bot: TuxBot):
|
||||
bot.help_command = HelpCommand()
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
bot.add_cog(Help(bot))
|
180
cogs/Images.py
Normal file
180
cogs/Images.py
Normal file
|
@ -0,0 +1,180 @@
|
|||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
import discord
|
||||
from discord.ext import commands, flags
|
||||
|
||||
from app import TuxBot
|
||||
from utils.functions.extra import ContextPlus
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Images(commands.Cog, name="Images"):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.image_api = "http://0.0.0.0:8080"
|
||||
|
||||
async def _send_meme(self, ctx: ContextPlus, endpoint: str, **passed_flags):
|
||||
async with ctx.typing():
|
||||
url = f"{self.image_api}/{endpoint}?"
|
||||
for key, val in passed_flags.items():
|
||||
if val:
|
||||
url += f"{key}={val}&"
|
||||
|
||||
async with self.bot.session.get(url) as r:
|
||||
if r.status != 200:
|
||||
return await ctx.send("Failed...")
|
||||
|
||||
data = BytesIO(await r.read())
|
||||
|
||||
await ctx.send(
|
||||
file=discord.File(data, "output.png")
|
||||
)
|
||||
|
||||
@commands.command(name="phcomment")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _phcomment(self, ctx: ContextPlus, user: discord.User = None, *, message: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
|
||||
async with ctx.typing():
|
||||
message = message.replace("&", "%26")
|
||||
if user is None:
|
||||
avatar = ctx.author.avatar_url_as(format='png')
|
||||
username = ctx.author.name
|
||||
else:
|
||||
avatar = user.avatar_url_as(format='png')
|
||||
username = user.name
|
||||
|
||||
url = f"{self.image_api}/ph/comment" \
|
||||
f"?image={avatar}" \
|
||||
f"&username={username}" \
|
||||
f"&message={message}"
|
||||
|
||||
async with self.bot.session.get(url) as r:
|
||||
if r.status != 200:
|
||||
return await ctx.send("Failed...")
|
||||
|
||||
data = BytesIO(await r.read())
|
||||
|
||||
await ctx.send(
|
||||
file=discord.File(data, "output.png")
|
||||
)
|
||||
|
||||
@commands.command(name="phvideo")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _phvideo(self, ctx: ContextPlus, image: str, author: discord.User, *, title: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
|
||||
async with ctx.typing():
|
||||
url = f"{self.image_api}/ph/video" \
|
||||
f"?image={image}" \
|
||||
f"&username={author.name}" \
|
||||
f"&title={title}"
|
||||
|
||||
async with self.bot.session.get(url) as r:
|
||||
if r.status != 200:
|
||||
return await ctx.send("Failed...")
|
||||
|
||||
data = BytesIO(await r.read())
|
||||
|
||||
await ctx.send(
|
||||
file=discord.File(data, "output.png")
|
||||
)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@flags.command(name="balloon")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _balloon(self, ctx: ContextPlus, **passed_flags):
|
||||
passed_flags["text3"] = passed_flags.get("text3")
|
||||
passed_flags["text4"] = passed_flags.get("text1")
|
||||
passed_flags["text5"] = passed_flags.get("text2")
|
||||
|
||||
await self._send_meme(ctx, 'balloon', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@flags.command(name="butterfly")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _butterfly(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'butterfly', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.command(name="buttons")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _buttons(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'buttons', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.command(name="cmm")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _cmm(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'change_my_mind', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.command(name="drake")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _drake(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'drake', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str, default=False)
|
||||
@flags.command(name="fry")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _fry(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'fry', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str, default=False)
|
||||
@flags.command(name="imagination")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _imagination(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'imagination', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str, default=False)
|
||||
@flags.command(name="everywhere")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _everywhere(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'everywhere', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@flags.command(name="choice")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _choice(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'choice', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.command(name="pika")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _pika(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'pika', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.add_flag("--text3", type=str)
|
||||
@flags.command(name="pkp")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _pkp(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'pkp', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.add_flag("--text2", type=str)
|
||||
@flags.command(name="puppet")
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _puppet(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'puppet', **passed_flags)
|
||||
|
||||
@flags.add_flag("--text1", type=str)
|
||||
@flags.command(name="scroll_of_truth", alias=['sot'])
|
||||
@commands.cooldown(1, 5, commands.BucketType.user)
|
||||
async def _sot(self, ctx: ContextPlus, **passed_flags):
|
||||
await self._send_meme(ctx, 'scroll_of_truth', **passed_flags)
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
cog = Images(bot)
|
||||
bot.add_cog(cog)
|
76
cogs/Logs.py
76
cogs/Logs.py
|
@ -18,9 +18,7 @@ import humanize
|
|||
import psutil
|
||||
from discord.ext import commands, tasks
|
||||
|
||||
from bot import TuxBot
|
||||
from utils import Texts
|
||||
from utils import command_extra
|
||||
from app import TuxBot
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -52,9 +50,6 @@ class Logs(commands.Cog):
|
|||
self._resumes = []
|
||||
self._identifies = defaultdict(list)
|
||||
|
||||
self.icon = ":newspaper:"
|
||||
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/120/twitter/233/newspaper_1f4f0.png"
|
||||
|
||||
def _clear_gateway_data(self):
|
||||
one_week_ago = datetime.datetime.utcnow() - datetime.timedelta(days=7)
|
||||
to_remove = [
|
||||
|
@ -112,8 +107,19 @@ class Logs(commands.Cog):
|
|||
self.bot.socket_stats[msg.get('t')] += 1
|
||||
|
||||
@property
|
||||
def webhook(self):
|
||||
return self.bot.logs_webhook
|
||||
def logs(self):
|
||||
webhooks = {}
|
||||
|
||||
for key, value in self.bot.logs_channels.items():
|
||||
webhooks[key] = discord.Webhook.partial(
|
||||
id=value.get('webhook')['id'],
|
||||
token=value.get('webhook')['token'],
|
||||
adapter=discord.AsyncWebhookAdapter(
|
||||
self.bot.session
|
||||
)
|
||||
)
|
||||
|
||||
return webhooks
|
||||
|
||||
async def log_error(self, *, ctx=None, extra=None):
|
||||
e = discord.Embed(title='Error', colour=0xdd5f53)
|
||||
|
@ -131,7 +137,7 @@ class Logs(commands.Cog):
|
|||
e.add_field(name='Channel', value=channel)
|
||||
e.add_field(name='Guild', value=guild)
|
||||
|
||||
await self.webhook.send(embed=e)
|
||||
await self.logs.get('errors').send(embed=e)
|
||||
|
||||
async def send_guild_stats(self, e, guild):
|
||||
e.add_field(name='Name', value=guild.name)
|
||||
|
@ -155,7 +161,7 @@ class Logs(commands.Cog):
|
|||
if guild.me:
|
||||
e.timestamp = guild.me.joined_at
|
||||
|
||||
await self.webhook.send(embed=e)
|
||||
await self.logs.get('guilds').send(embed=e)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_guild_join(self, guild: discord.guild):
|
||||
|
@ -169,17 +175,37 @@ class Logs(commands.Cog):
|
|||
|
||||
@commands.Cog.listener()
|
||||
async def on_message(self, message: discord.message):
|
||||
if message.guild is None:
|
||||
e = discord.Embed(colour=0x0a97f5, title='New DM') # blue colour
|
||||
ctx = await self.bot.get_context(message)
|
||||
if ctx.valid:
|
||||
return
|
||||
|
||||
if isinstance(message.channel, discord.DMChannel):
|
||||
if message.author is self.bot.user:
|
||||
e = discord.Embed(
|
||||
title=f"DM to: {message.channel.recipient}",
|
||||
description=message.content,
|
||||
color=0x39e326
|
||||
)
|
||||
else:
|
||||
e = discord.Embed(
|
||||
title="New DM:",
|
||||
description=message.content,
|
||||
color=0x0A97F5
|
||||
)
|
||||
e.set_author(
|
||||
name=message.author,
|
||||
icon_url=message.author.avatar_url_as(format='png')
|
||||
name=message.channel.recipient,
|
||||
icon_url=message.channel.recipient.avatar_url_as(format="png")
|
||||
)
|
||||
e.description = message.content
|
||||
if len(message.attachments) > 0:
|
||||
e.set_image(url=message.attachments[0].url)
|
||||
e.set_footer(text=f"User ID: {message.author.id}")
|
||||
await self.webhook.send(embed=e)
|
||||
|
||||
if message.attachments:
|
||||
attachment_url = message.attachments[0].url
|
||||
e.set_image(url=attachment_url)
|
||||
|
||||
e.set_footer(
|
||||
text=f"User ID: {message.channel.recipient.id}"
|
||||
)
|
||||
|
||||
await self.logs["dm"].send(embed=e)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_command_error(self, ctx, error):
|
||||
|
@ -212,7 +238,7 @@ class Logs(commands.Cog):
|
|||
)
|
||||
e.description = f'```py\n{exc}\n```'
|
||||
e.timestamp = datetime.datetime.utcnow()
|
||||
await self.webhook.send(embed=e)
|
||||
await self.logs.get('errors').send(embed=e)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_socket_raw_send(self, data):
|
||||
|
@ -241,9 +267,9 @@ class Logs(commands.Cog):
|
|||
emoji = types.get(record.levelname, ':heavy_multiplication_x:')
|
||||
dt = datetime.datetime.utcfromtimestamp(record.created)
|
||||
msg = f'{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {record.message}`'
|
||||
await self.webhook.send(msg)
|
||||
await self.logs.get('gateway').send(msg)
|
||||
|
||||
@command_extra(name='commandstats', hidden=True, category='misc')
|
||||
@commands.command('commandstats')
|
||||
@commands.is_owner()
|
||||
async def _commandstats(self, ctx, limit=20):
|
||||
counter = self.bot.command_stats
|
||||
|
@ -258,7 +284,7 @@ class Logs(commands.Cog):
|
|||
|
||||
await ctx.send(f'```\n{output}\n```')
|
||||
|
||||
@command_extra(name='socketstats', hidden=True, category='misc')
|
||||
@commands.command('socketstats')
|
||||
@commands.is_owner()
|
||||
async def _socketstats(self, ctx):
|
||||
delta = datetime.datetime.utcnow() - self.bot.uptime
|
||||
|
@ -268,7 +294,7 @@ class Logs(commands.Cog):
|
|||
await ctx.send(
|
||||
f'{total} socket events observed ({cpm:.2f}/minute):\n{self.bot.socket_stats}')
|
||||
|
||||
@command_extra(name='uptime', category='misc')
|
||||
@commands.command('uptime')
|
||||
async def _uptime(self, ctx):
|
||||
uptime = humanize.naturaltime(
|
||||
datetime.datetime.utcnow() - self.bot.uptime)
|
||||
|
@ -287,7 +313,7 @@ async def on_error(self, event, *args):
|
|||
args_str.append('```')
|
||||
e.add_field(name='Args', value='\n'.join(args_str), inline=False)
|
||||
|
||||
hook = self.get_cog('Logs').webhook
|
||||
hook = self.get_cog('Logs').logs.get('errors')
|
||||
try:
|
||||
await hook.send(embed=e)
|
||||
except (discord.HTTPException, discord.NotFound,
|
||||
|
|
|
@ -1,110 +0,0 @@
|
|||
import logging
|
||||
import urllib.request
|
||||
from datetime import datetime
|
||||
|
||||
import discord
|
||||
from aiohttp import web
|
||||
from discord.ext import tasks, commands
|
||||
|
||||
from bot import TuxBot
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Monitoring(commands.Cog):
|
||||
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
self.site = web.TCPSite
|
||||
|
||||
self.ping_clusters.start()
|
||||
|
||||
app = web.Application()
|
||||
app.add_routes([web.get('/', self.handle)])
|
||||
|
||||
self.runner = web.AppRunner(app)
|
||||
self.bot.loop.create_task(self.start_HTTPMonitoring_server())
|
||||
|
||||
def cog_unload(self):
|
||||
self.ping_clusters.stop()
|
||||
|
||||
@tasks.loop(seconds=10.0)
|
||||
async def ping_clusters(self):
|
||||
for cluster in self.bot.fallbacks:
|
||||
if cluster == 'DEFAULT':
|
||||
pass
|
||||
else:
|
||||
cluster = self.bot.fallbacks[cluster]
|
||||
if not cluster.get('This', False):
|
||||
host = cluster.get('Host')
|
||||
port = cluster.get('Port')
|
||||
|
||||
try:
|
||||
req = urllib.request.urlopen(
|
||||
f"http://{host}:{port}",
|
||||
timeout=2
|
||||
)
|
||||
except Exception:
|
||||
global_channel = await self.bot.fetch_channel(
|
||||
661347412463321098
|
||||
)
|
||||
|
||||
e = discord.Embed(
|
||||
title=f"Server `{cluster.get('Name')}`",
|
||||
color=discord.colour.Color.red(),
|
||||
description=f"Server **`{cluster.get('Name')}`** with address **`http://{host}:{port}`** is down ! ",
|
||||
timestamp=datetime.now()
|
||||
)
|
||||
e.set_thumbnail(
|
||||
url='https://upload.wikimedia.org/wikipedia/commons/7/75/Erroricon404.PNG'
|
||||
)
|
||||
|
||||
await global_channel.send(embed=e)
|
||||
else:
|
||||
print(req.read().decode())
|
||||
|
||||
@ping_clusters.before_loop
|
||||
async def before_pinging(self):
|
||||
await self.bot.wait_until_ready()
|
||||
|
||||
cluster = self.bot.cluster
|
||||
host = cluster.get('Host')
|
||||
port = cluster.get('Port')
|
||||
|
||||
global_channel = await self.bot.fetch_channel(
|
||||
661347412463321098
|
||||
)
|
||||
|
||||
e = discord.Embed(
|
||||
title=f"Server `{cluster.get('Name')}`",
|
||||
color=discord.colour.Color.green(),
|
||||
description=f"Server **`{cluster.get('Name')}`** with address **`http://{host}:{port}`** is started ! ",
|
||||
timestamp=datetime.now()
|
||||
)
|
||||
e.set_thumbnail(
|
||||
url='https://upload.wikimedia.org/wikipedia/commons/thumb/d/d1/MW-Icon-CheckMark.svg/1024px-MW-Icon-CheckMark.svg.png'
|
||||
)
|
||||
|
||||
await global_channel.send(embed=e)
|
||||
|
||||
async def start_HTTPMonitoring_server(self):
|
||||
host = self.bot.cluster.get('WebPage')
|
||||
port = self.bot.cluster.get('Port')
|
||||
|
||||
print(f"Starting HTTP Monitoring server on {host}:{port}")
|
||||
|
||||
await self.runner.setup()
|
||||
self.site = web.TCPSite(self.runner, host, port)
|
||||
await self.site.start()
|
||||
|
||||
async def handle(self, _):
|
||||
return web.json_response(
|
||||
{
|
||||
'message': "I'm alive !",
|
||||
'ws': self.bot.latency * 1000
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
bot.add_cog(Monitoring(bot))
|
222
cogs/Poll.py
222
cogs/Poll.py
|
@ -1,222 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from yarl import URL
|
||||
|
||||
from bot import TuxBot
|
||||
from utils import PollModel, ResponsesModel
|
||||
from utils import Texts
|
||||
from utils.functions import emotes as utils_emotes
|
||||
from utils import group_extra
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Poll(commands.Cog):
|
||||
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
self.icon = ":bar_chart:"
|
||||
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/120/twitter/233/bar-chart_1f4ca.png:"
|
||||
|
||||
def get_poll(self, pld) -> Union[bool, PollModel]:
|
||||
if pld.user_id != self.bot.user.id:
|
||||
poll = self.bot.database.session \
|
||||
.query(PollModel) \
|
||||
.filter(PollModel.message_id == pld.message_id)
|
||||
|
||||
if poll.count() > 0:
|
||||
poll = poll.one()
|
||||
emotes = utils_emotes.get(poll.available_choices)
|
||||
if pld.emoji.name in emotes:
|
||||
return poll
|
||||
|
||||
return False
|
||||
|
||||
async def remove_reaction(self, pld):
|
||||
channel: discord.TextChannel = self.bot.get_channel(pld.channel_id)
|
||||
message: discord.Message = await channel.fetch_message(pld.message_id)
|
||||
user: discord.User = await self.bot.fetch_user(pld.user_id)
|
||||
|
||||
await message.remove_reaction(pld.emoji.name, user)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_raw_reaction_add(self, pld: discord.RawReactionActionEvent):
|
||||
poll = self.get_poll(pld)
|
||||
|
||||
if poll:
|
||||
if poll.is_anonymous:
|
||||
try:
|
||||
await self.remove_reaction(pld)
|
||||
except discord.errors.Forbidden:
|
||||
pass
|
||||
choice = utils_emotes.get_index(pld.emoji.name)
|
||||
|
||||
responses = self.bot.database.session.query(ResponsesModel) \
|
||||
.filter(
|
||||
ResponsesModel.poll_id == poll.id,
|
||||
ResponsesModel.user == pld.user_id,
|
||||
ResponsesModel.choice == choice
|
||||
)
|
||||
|
||||
if responses.count() != 0:
|
||||
response = responses.first()
|
||||
self.bot.database.session.delete(response)
|
||||
self.bot.database.session.commit()
|
||||
else:
|
||||
response = ResponsesModel(
|
||||
user=pld.user_id,
|
||||
poll_id=poll.id,
|
||||
choice=choice
|
||||
)
|
||||
self.bot.database.session.add(response)
|
||||
self.bot.database.session.commit()
|
||||
|
||||
await self.update_poll(poll.id)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_raw_reaction_remove(self,
|
||||
pld: discord.RawReactionActionEvent):
|
||||
poll = self.get_poll(pld)
|
||||
|
||||
if poll:
|
||||
choice = utils_emotes.get_index(pld.emoji.name)
|
||||
|
||||
responses = self.bot.database.session.query(ResponsesModel) \
|
||||
.filter(
|
||||
ResponsesModel.poll_id == poll.id,
|
||||
ResponsesModel.user == pld.user_id,
|
||||
ResponsesModel.choice == choice
|
||||
)
|
||||
|
||||
if responses.count() != 0:
|
||||
response = responses.first()
|
||||
self.bot.database.session.delete(response)
|
||||
self.bot.database.session.commit()
|
||||
await self.update_poll(poll.id)
|
||||
|
||||
###########################################################################
|
||||
|
||||
async def create_poll(self, ctx: commands.Context, poll: str, anonymous):
|
||||
question = (poll.split('|')[0]).strip()
|
||||
responses = [response.strip() for response in poll.split('|')[1:]]
|
||||
emotes = utils_emotes.get(len(responses))
|
||||
|
||||
stmt = await ctx.send(Texts('poll', ctx).get('**Preparation...**'))
|
||||
|
||||
poll_row = PollModel()
|
||||
self.bot.database.session.add(poll_row)
|
||||
self.bot.database.session.flush()
|
||||
|
||||
e = discord.Embed(description=f"**{question}**")
|
||||
e.set_author(
|
||||
name=ctx.author,
|
||||
icon_url="https://cdn.gnous.eu/tuxbot/survey1.png"
|
||||
)
|
||||
for i, response in enumerate(responses):
|
||||
e.add_field(
|
||||
name=f"__{emotes[i]}` - {response.capitalize()}`__",
|
||||
value="**0** vote"
|
||||
)
|
||||
e.set_footer(text=f"ID: #{poll_row.id}")
|
||||
|
||||
poll_row.channel_id = stmt.channel.id
|
||||
poll_row.message_id = stmt.id
|
||||
poll_row.content = e.to_dict()
|
||||
poll_row.is_anonymous = anonymous
|
||||
poll_row.available_choices = len(responses)
|
||||
|
||||
self.bot.database.session.commit()
|
||||
|
||||
await stmt.edit(content='', embed=e)
|
||||
for emote in range(len(responses)):
|
||||
await stmt.add_reaction(emotes[emote])
|
||||
|
||||
async def update_poll(self, poll_id: int):
|
||||
poll = self.bot.database.session \
|
||||
.query(PollModel) \
|
||||
.filter(PollModel.id == poll_id) \
|
||||
.one()
|
||||
channel: discord.TextChannel = self.bot.get_channel(poll.channel_id)
|
||||
message: discord.Message = await channel.fetch_message(poll.message_id)
|
||||
|
||||
chart_base_url = "https://quickchart.io/chart?backgroundColor=white&c="
|
||||
chart_options = {
|
||||
'type': 'pie',
|
||||
'data': {
|
||||
'labels': [],
|
||||
'datasets': [
|
||||
{
|
||||
'data': []
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
content = json.loads(poll.content) \
|
||||
if isinstance(poll.content, str) \
|
||||
else poll.content
|
||||
raw_responses = self.bot.database.session \
|
||||
.query(ResponsesModel) \
|
||||
.filter(ResponsesModel.poll_id == poll_id)
|
||||
responses = {}
|
||||
|
||||
for response in raw_responses.all():
|
||||
if responses.get(response.choice):
|
||||
responses[response.choice] += 1
|
||||
else:
|
||||
responses[response.choice] = 1
|
||||
|
||||
for i, field in enumerate(content.get('fields')):
|
||||
responders = responses.get(i, 0)
|
||||
chart_options.get('data') \
|
||||
.get('labels') \
|
||||
.append(field.get('name')[5:].replace('__', ''))
|
||||
chart_options.get('data') \
|
||||
.get('datasets')[0] \
|
||||
.get('data') \
|
||||
.append(responders)
|
||||
|
||||
if responders <= 1:
|
||||
field['value'] = f"**{responders}** vote"
|
||||
else:
|
||||
field['value'] = f"**{responders}** votes"
|
||||
|
||||
e = discord.Embed(description=content.get('description'))
|
||||
e.set_author(
|
||||
name=content.get('author').get('name'),
|
||||
icon_url=content.get('author').get('icon_url')
|
||||
)
|
||||
chart_url = URL(chart_base_url + json.dumps(chart_options))
|
||||
e.set_thumbnail(url=str(chart_url))
|
||||
for field in content.get('fields'):
|
||||
e.add_field(
|
||||
name=field.get('name'),
|
||||
value=field.get('value'),
|
||||
inline=True
|
||||
)
|
||||
e.set_footer(text=content.get('footer').get('text'))
|
||||
|
||||
await message.edit(embed=e)
|
||||
|
||||
poll.content = json.dumps(content)
|
||||
self.bot.database.session.commit()
|
||||
|
||||
@group_extra(name='poll', aliases=['sondage'], category='poll')
|
||||
async def _poll(self, ctx: commands.Context):
|
||||
if ctx.invoked_subcommand is None:
|
||||
await ctx.send_help('poll')
|
||||
|
||||
@_poll.group(name='create', aliases=['new', 'nouveau'])
|
||||
async def _poll_create(self, ctx: commands.Context, *, poll: str):
|
||||
is_anonymous = '--anonyme' in poll
|
||||
poll = poll.replace('--anonyme', '')
|
||||
|
||||
await self.create_poll(ctx, poll, anonymous=is_anonymous)
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
bot.add_cog(Poll(bot))
|
410
cogs/Useful.py
410
cogs/Useful.py
|
@ -1,410 +0,0 @@
|
|||
# Created by romain at 04/01/2020
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import platform
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import time
|
||||
from socket import AF_INET6
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
from PIL import ImageFont
|
||||
from PIL import ImageDraw
|
||||
from PIL import ImageOps
|
||||
|
||||
import aiohttp
|
||||
import discord
|
||||
import humanize
|
||||
import psutil
|
||||
from discord.ext import commands
|
||||
from tcp_latency import measure_latency
|
||||
|
||||
from bot import TuxBot
|
||||
from utils import Texts
|
||||
from utils import command_extra, group_extra
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Useful(commands.Cog):
|
||||
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
self.icon = ":toolbox:"
|
||||
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/120/twitter/233/toolbox_1f9f0.png"
|
||||
|
||||
@staticmethod
|
||||
def _latest_commits():
|
||||
cmd = 'git log -n 3 -s --format="[\`%h\`](https://git.gnous.eu/gnouseu/tuxbot-bot/commits/%H) %s (%cr)"'
|
||||
|
||||
return os.popen(cmd).read().strip()
|
||||
|
||||
@staticmethod
|
||||
def fetch_info():
|
||||
total_lines = 0
|
||||
total_python_lines = 0
|
||||
file_amount = 0
|
||||
python_file_amount = 0
|
||||
ENV = "env"
|
||||
|
||||
for path, _, files in os.walk("."):
|
||||
for name in files:
|
||||
file_dir = str(pathlib.PurePath(path, name))
|
||||
if (
|
||||
not name.endswith(".py")
|
||||
and not name.endswith(".po")
|
||||
and not name.endswith(".json")
|
||||
) or ENV in file_dir:
|
||||
continue
|
||||
file_amount += 1
|
||||
python_file_amount += 1 if name.endswith(".py") else 0
|
||||
with open(file_dir, "r", encoding="utf-8") as file:
|
||||
for line in file:
|
||||
if not line.strip().startswith("#") \
|
||||
or not line.strip():
|
||||
total_lines += 1
|
||||
total_python_lines += 1 if name.endswith(".py") \
|
||||
else 0
|
||||
|
||||
return (file_amount, total_lines), (
|
||||
python_file_amount, total_python_lines)
|
||||
|
||||
@staticmethod
|
||||
def luhn_checker(number: int):
|
||||
digits = [int(x) for x in reversed(str(number))]
|
||||
|
||||
for index, digit in enumerate(digits, start=1):
|
||||
digit = digit * 2 if index % 2 == 0 else digit
|
||||
if digit >= 10:
|
||||
digit = sum(int(x) for x in list(str(digit)))
|
||||
|
||||
digits[index - 1] = digit
|
||||
|
||||
return sum(digits) % 10 == 0
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='iplocalise', category='network')
|
||||
async def _iplocalise(self, ctx: commands.Context, addr, ip_type=''):
|
||||
addr = re.sub(r'http(s?)://', '', addr)
|
||||
addr = addr[:-1] if addr.endswith('/') else addr
|
||||
|
||||
await ctx.trigger_typing()
|
||||
|
||||
try:
|
||||
if 'v6' in ip_type:
|
||||
try:
|
||||
ip = socket.getaddrinfo(addr, None, AF_INET6)[1][4][0]
|
||||
except socket.gaierror:
|
||||
return await ctx.send(
|
||||
Texts('useful', ctx).get('ipv6 not available'))
|
||||
else:
|
||||
ip = socket.gethostbyname(addr)
|
||||
|
||||
async with self.bot.session.get(f"http://ip-api.com/json/{ip}") \
|
||||
as s:
|
||||
response: dict = await s.json()
|
||||
if response.get('status') == 'success':
|
||||
e = discord.Embed(
|
||||
title=f"{Texts('useful', ctx).get('Information for')}"
|
||||
f" ``{addr}`` *`({response.get('query')})`*",
|
||||
color=0x5858d7
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name=Texts('useful', ctx).get('Belongs to :'),
|
||||
value=response['org'] if response['org'] else 'N/A',
|
||||
inline=False
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name=Texts('useful', ctx).get('Is located at :'),
|
||||
value=response['city'] if response['city'] else 'N/A',
|
||||
inline=True
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name="Region :",
|
||||
value=f"{response['regionName'] if response['regionName'] else 'N/A'} "
|
||||
f"({response['country'] if response['country'] else 'N/A'})",
|
||||
inline=True
|
||||
)
|
||||
|
||||
e.set_thumbnail(
|
||||
url=f"https://www.countryflags.io/"
|
||||
f"{response.get('countryCode')}/flat/64.png")
|
||||
|
||||
await ctx.send(embed=e)
|
||||
else:
|
||||
await ctx.send(
|
||||
content=f"{Texts('useful', ctx).get('info not available')}"
|
||||
f"``{response['query'] if response['query'] else 'N/A'}``")
|
||||
|
||||
except Exception as e:
|
||||
await ctx.send(e)
|
||||
await ctx.send(
|
||||
f"{Texts('useful', ctx).get('Cannot connect to host')} {addr}"
|
||||
)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='getheaders', category='network')
|
||||
async def _getheaders(self, ctx: commands.Context, addr: str):
|
||||
if (addr.startswith('http') or addr.startswith('ftp')) is not True:
|
||||
addr = f"http://{addr}"
|
||||
|
||||
await ctx.trigger_typing()
|
||||
|
||||
try:
|
||||
async with self.bot.session.get(addr) as s:
|
||||
e = discord.Embed(
|
||||
title=f"{Texts('useful', ctx).get('Headers of')} {addr}",
|
||||
color=0xd75858
|
||||
)
|
||||
e.add_field(name="Status", value=s.status, inline=True)
|
||||
e.set_thumbnail(url=f"https://http.cat/{s.status}")
|
||||
|
||||
headers = dict(s.headers.items())
|
||||
headers.pop('Set-Cookie', headers)
|
||||
|
||||
for key, value in headers.items():
|
||||
e.add_field(name=key, value=value, inline=True)
|
||||
await ctx.send(embed=e)
|
||||
|
||||
except aiohttp.ClientError:
|
||||
await ctx.send(
|
||||
f"{Texts('useful', ctx).get('Cannot connect to host')} {addr}"
|
||||
)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='git', aliases=['sources', 'source', 'github'], category='misc')
|
||||
async def _git(self, ctx):
|
||||
e = discord.Embed(
|
||||
title=Texts('useful', ctx).get('git repo'),
|
||||
description=Texts('useful', ctx).get('git text'),
|
||||
colour=0xE9D460
|
||||
)
|
||||
e.set_author(
|
||||
name='Gnous',
|
||||
icon_url="https://cdn.gnous.eu/logo1.png"
|
||||
)
|
||||
await ctx.send(embed=e)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='quote', category='misc')
|
||||
async def _quote(self, ctx, message_id: discord.Message):
|
||||
e = discord.Embed(
|
||||
colour=message_id.author.colour,
|
||||
description=message_id.clean_content,
|
||||
timestamp=message_id.created_at
|
||||
)
|
||||
e.set_author(
|
||||
name=message_id.author.display_name,
|
||||
icon_url=message_id.author.avatar_url_as(format="jpg")
|
||||
)
|
||||
if len(message_id.attachments) >= 1:
|
||||
e.set_image(url=message_id.attachments[0].url)
|
||||
|
||||
e.add_field(name="**Original**",
|
||||
value=f"[Go!]({message_id.jump_url})")
|
||||
e.set_footer(text="#" + message_id.channel.name)
|
||||
|
||||
await ctx.send(embed=e)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='ping', category='network')
|
||||
async def _ping(self, ctx: commands.Context):
|
||||
start = time.perf_counter()
|
||||
await ctx.trigger_typing()
|
||||
end = time.perf_counter()
|
||||
|
||||
latency = round(self.bot.latency * 1000, 2)
|
||||
typing = round((end - start) * 1000, 2)
|
||||
discordapp = measure_latency(host='discordapp.com', wait=0)[0]
|
||||
|
||||
e = discord.Embed(title='Ping', color=discord.Color.teal())
|
||||
e.add_field(name='Websocket', value=f'{latency}ms')
|
||||
e.add_field(name='Typing', value=f'{typing}ms')
|
||||
e.add_field(name='discordapp.com', value=f'{discordapp}ms')
|
||||
await ctx.send(embed=e)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='info', aliases=['about'], category='misc')
|
||||
async def _info(self, ctx: commands.Context):
|
||||
proc = psutil.Process()
|
||||
total, python = self.fetch_info()
|
||||
|
||||
with proc.oneshot():
|
||||
mem = proc.memory_full_info()
|
||||
e = discord.Embed(
|
||||
title=Texts('useful', ctx).get('Information about TuxBot'),
|
||||
color=0x89C4F9)
|
||||
|
||||
e.add_field(
|
||||
name=f"__:busts_in_silhouette: "
|
||||
f"{Texts('useful', ctx).get('Development')}__",
|
||||
value=f"**Romain#5117:** [git](https://git.gnous.eu/Romain)\n"
|
||||
f"**Outout#4039:** [git](https://git.gnous.eu/mael)\n",
|
||||
inline=True
|
||||
)
|
||||
e.add_field(
|
||||
name="__<:python:596577462335307777> Python__",
|
||||
value=f"**python** `{platform.python_version()}`\n"
|
||||
f"**discord.py** `{discord.__version__}`",
|
||||
inline=True
|
||||
)
|
||||
e.add_field(
|
||||
name="__:gear: Usage__",
|
||||
value=f"**{humanize.naturalsize(mem.rss)}** "
|
||||
f"{Texts('useful', ctx).get('physical memory')}\n"
|
||||
f"**{humanize.naturalsize(mem.vms)}** "
|
||||
f"{Texts('useful', ctx).get('virtual memory')}\n",
|
||||
inline=True
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name=f"__{Texts('useful', ctx).get('Servers count')}__",
|
||||
value=str(len(self.bot.guilds)),
|
||||
inline=True
|
||||
)
|
||||
e.add_field(
|
||||
name=f"__{Texts('useful', ctx).get('Channels count')}__",
|
||||
value=str(len([_ for _ in self.bot.get_all_channels()])),
|
||||
inline=True
|
||||
)
|
||||
e.add_field(
|
||||
name=f"__{Texts('useful', ctx).get('Members count')}__",
|
||||
value=str(len([_ for _ in self.bot.get_all_members()])),
|
||||
inline=True
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name=f"__:file_folder: {Texts('useful', ctx).get('Files')}__",
|
||||
value=f"{total[0]} *({python[0]} <:python:596577462335307777>)*",
|
||||
inline=True
|
||||
)
|
||||
e.add_field(
|
||||
name=f"__¶ {Texts('useful', ctx).get('Lines')}__",
|
||||
value=f"{total[1]} *({python[1]} <:python:596577462335307777>)*",
|
||||
inline=True
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name=f"__{Texts('useful', ctx).get('Latest changes')}__",
|
||||
value=self._latest_commits(),
|
||||
inline=False
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name=f"__:link: {Texts('useful', ctx).get('Links')}__",
|
||||
value="[tuxbot.gnous.eu](https://tuxbot.gnous.eu/) "
|
||||
"| [gnous.eu](https://gnous.eu/) "
|
||||
"| [git](https://git.gnous.eu/gnouseu/tuxbot-bot) "
|
||||
"| [status](https://status.gnous.eu/check/154250) "
|
||||
f"| [{Texts('useful', ctx).get('Invite')}](https://discordapp.com/oauth2/authorize?client_id=301062143942590465&scope=bot&permissions=268749888)",
|
||||
inline=False
|
||||
)
|
||||
|
||||
e.set_footer(text=f'version: {self.bot.version} '
|
||||
f'• prefix: {ctx.prefix}')
|
||||
|
||||
await ctx.send(embed=e)
|
||||
|
||||
###########################################################################
|
||||
|
||||
@command_extra(name='credits', aliases=['contributors', 'authors'], category='misc')
|
||||
async def _credits(self, ctx: commands.Context):
|
||||
e = discord.Embed(
|
||||
title=Texts('useful', ctx).get('Contributors'),
|
||||
color=0x36393f
|
||||
)
|
||||
|
||||
e.add_field(
|
||||
name="**Outout#4039** ",
|
||||
value="• https://git.gnous.eu/mael ⠀\n"
|
||||
"• mael@gnous.eu\n"
|
||||
"• [@outoutxyz](https://twitter.com/outouxyz)",
|
||||
inline=True
|
||||
)
|
||||
e.add_field(
|
||||
name="**Romain#5117** ",
|
||||
value="• https://git.gnous.eu/Romain\n"
|
||||
"• romain@gnous.eu",
|
||||
inline=True
|
||||
)
|
||||
|
||||
await ctx.send(embed=e)
|
||||
|
||||
###########################################################################
|
||||
@group_extra(name='cb', aliases=['cc'], category='misc')
|
||||
@commands.cooldown(1, 5, type=commands.BucketType.user)
|
||||
async def _cb(self, ctx: commands.Context):
|
||||
if ctx.invoked_subcommand is None:
|
||||
await ctx.send_help('cb')
|
||||
|
||||
@_cb.command(name='validate', aliases=['valid', 'correct'], category='misc')
|
||||
@commands.cooldown(1, 5, type=commands.BucketType.user)
|
||||
async def _cb_validate(self, ctx: commands.Context, *, number: int):
|
||||
valid = self.luhn_checker(number)
|
||||
|
||||
await ctx.send(
|
||||
Texts(
|
||||
'useful', ctx
|
||||
).get(
|
||||
'valid_credit_card'
|
||||
if valid
|
||||
else 'invalid_credit_card'
|
||||
)
|
||||
)
|
||||
|
||||
@_cb.command(name='generate', aliases=['new', 'get'], category='misc')
|
||||
@commands.cooldown(1, 5, type=commands.BucketType.user)
|
||||
async def _cb_generate(self, ctx: commands.Context):
|
||||
await ctx.channel.trigger_typing()
|
||||
|
||||
number = random.randint(4000_0000_0000_0000, 5999_9999_9999_9999)
|
||||
while not self.luhn_checker(number):
|
||||
number = random.randint(4000_0000_0000_0000, 5999_9999_9999_9999)
|
||||
number = str(number)
|
||||
cvv = ''.join(random.choice("abcdefghij") for _ in range(3))
|
||||
|
||||
with Image.open("utils/images/blank_credit_card.png") as blank:
|
||||
cc_font = ImageFont.truetype('utils/fonts/credit_card.ttf', 26)
|
||||
user_font = ImageFont.truetype('utils/fonts/credit_card.ttf', 20)
|
||||
draw = ImageDraw.Draw(blank)
|
||||
|
||||
cvv_text = Image.new('L', (500, 50))
|
||||
cvv_draw = ImageDraw.Draw(cvv_text)
|
||||
cvv_draw.text((0, 0), cvv, font=user_font, fill=255)
|
||||
cvv_rotated = cvv_text.rotate(23, expand=1)
|
||||
|
||||
draw.text(
|
||||
(69, 510),
|
||||
' '.join([number[i:i+4] for i in range(0, len(number), 4)]),
|
||||
(210, 210, 210),
|
||||
font=cc_font
|
||||
)
|
||||
|
||||
draw.text(
|
||||
(69, 550),
|
||||
ctx.author.name.upper(),
|
||||
(210, 210, 210),
|
||||
font=user_font
|
||||
)
|
||||
blank.paste(ImageOps.colorize(cvv_rotated, (0, 0, 0), (0, 0, 0)), (470, 0), cvv_rotated)
|
||||
|
||||
output = BytesIO()
|
||||
blank.save(output, 'png')
|
||||
output.seek(0)
|
||||
|
||||
await ctx.send(file=discord.File(fp=output, filename="credit_card.png"))
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
bot.add_cog(Useful(bot))
|
64
cogs/User.py
64
cogs/User.py
|
@ -1,64 +0,0 @@
|
|||
import logging
|
||||
|
||||
from discord.ext import commands
|
||||
|
||||
from bot import TuxBot
|
||||
from utils import AliasesModel
|
||||
from utils import Texts
|
||||
from utils import group_extra
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class User(commands.Cog):
|
||||
|
||||
def __init__(self, bot: TuxBot):
|
||||
self.bot = bot
|
||||
self.icon = ":bust_in_silhouette:"
|
||||
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/120/twitter/233/bust-in-silhouette_1f464.png"
|
||||
|
||||
###########################################################################
|
||||
|
||||
@group_extra(name='alias', aliases=['aliases'], category='alias')
|
||||
async def _alias(self, ctx: commands.Context):
|
||||
if ctx.invoked_subcommand is None:
|
||||
await ctx.send_help('alias')
|
||||
|
||||
@_alias.command(name='add', aliases=['set', 'new'])
|
||||
async def _alias_add(self, ctx: commands.Context, *, user_alias: str):
|
||||
is_global = False
|
||||
if '--global' in user_alias:
|
||||
is_global = True
|
||||
user_alias.replace('--global', '')
|
||||
|
||||
user_alias = user_alias.split(' -> ')
|
||||
if len(user_alias) != 2:
|
||||
return await ctx.send_help('alias')
|
||||
|
||||
command = user_alias[1]
|
||||
user_alias = user_alias[0]
|
||||
|
||||
if self.bot.get_command(command) is None:
|
||||
return await ctx.send(Texts('user').get('Command not found'))
|
||||
|
||||
alias = AliasesModel(
|
||||
user_id=ctx.author.id,
|
||||
alias=user_alias,
|
||||
command=command,
|
||||
guild="global" if is_global else str(ctx.guild.id)
|
||||
)
|
||||
|
||||
self.bot.database.session.add(alias)
|
||||
self.bot.database.session.commit()
|
||||
|
||||
@_alias.command(name='remove', aliases=['drop', 'del', 'delete'])
|
||||
async def _alias_remove(self, ctx: commands.Context, prefix: str):
|
||||
...
|
||||
|
||||
@_alias.command(name='list', aliases=['show', 'all'])
|
||||
async def _alias_list(self, ctx: commands.Context):
|
||||
...
|
||||
|
||||
|
||||
def setup(bot: TuxBot):
|
||||
bot.add_cog(User(bot))
|
Loading…
Add table
Add a link
Reference in a new issue