refactor(cogs): prepare the env for help command

This commit is contained in:
Romain J 2020-01-04 02:46:12 +01:00
commit fdf220cdfa
27 changed files with 452 additions and 261 deletions

View file

@ -1,17 +1,16 @@
import asyncio
import datetime
import logging
from typing import Union
import asyncio
import discord
import humanize
from discord.ext import commands
from bot import TuxBot
from .utils.lang import Texts
from .utils.extra import commandExtra, groupExtra
from .utils.models import WarnModel, LangModel
from utils import Texts
from utils import WarnModel, LangModel
from utils import commandExtra, groupExtra
log = logging.getLogger(__name__)
@ -20,6 +19,8 @@ class Admin(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
self.icon = ":shield:"
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/160/twitter/233/shield_1f6e1.png"
async def cog_check(self, ctx: commands.Context) -> bool:
permissions: discord.Permissions = ctx.channel.permissions_for(

220
cogs/Help.py Normal file
View file

@ -0,0 +1,220 @@
# Created by romain at 04/01/2020
import logging
import discord
from discord.ext import commands
from bot import TuxBot
from utils import FieldPages
from utils import commandsPlus
log = logging.getLogger(__name__)
class HelpCommand(commands.HelpCommand):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ignore_cogs = ["Monitoring", "Help", "Logs"]
self.owner_cogs = []
def get_command_signature(self, command):
return f"[{command.cog.qualified_name.upper()}] > {command.qualified_name}"
def common_command_formatting(self, emb, command):
emb.title = self.get_command_signature(command)
if command.cog_name != "Jishaku":
emb.set_thumbnail(url=command.cog.big_icon)
try:
emb.description = f"{command.cog.qualified_name.lower()}_help " \
f"{command.parent}_{command.name}_description"
except:
emb.description = f"{command.cog.qualified_name.lower()}_help " \
f"{command.name}_description"
usage = "help.command_help.usage"
try:
if command.parent:
try:
usg = f"{command.cog.qualified_name.lower()}_help " \
f"{command.parent}_{command.name}_usage"
except:
usg = f"{command.cog.qualified_name.lower()}_help " \
f"{command.name}_usage"
else:
usg = f"{command.cog.qualified_name.lower()}_help " \
f"{command.name}_usage"
emb.add_field(name=usage,
value=f"{self.context.prefix}{command.qualified_name} " + usg)
except KeyError:
emb.add_field(name=usage,
value=f"{self.context.prefix}{command.qualified_name}")
aliases = "`" + '`, `'.join(command.aliases) + "`"
if aliases == "``":
aliases = "help " \
"help.command_help.no_aliases"
emb.add_field(name="help "
"help.command_help.aliases",
value=aliases)
return emb
async def command_callback(self, ctx, *, command=None):
await self.prepare_help_command(ctx, command)
if command is None:
mapping = self.get_bot_mapping()
return await self.send_bot_help(mapping)
cog = ctx.bot.get_cog(command.title())
if cog is not None:
return await self.send_cog_help(cog)
maybe_coro = discord.utils.maybe_coroutine
keys = command.split(' ')
cmd = ctx.bot.all_commands.get(keys[0])
if cmd is None:
string = await maybe_coro(self.command_not_found,
self.remove_mentions(keys[0]))
return await self.send_error_message(string)
for key in keys[1:]:
try:
found = cmd.all_commands.get(key)
except AttributeError:
string = await maybe_coro(self.subcommand_not_found, cmd,
self.remove_mentions(key))
return await self.send_error_message(string)
else:
if found is None:
string = await maybe_coro(self.subcommand_not_found,
cmd,
self.remove_mentions(key))
return await self.send_error_message(string)
cmd = found
if isinstance(cmd, commands.Group):
return await self.send_group_help(cmd)
else:
return await self.send_command_help(cmd)
async def send_bot_help(self, mapping):
owner = self.context.bot.owner
emb = discord.Embed(color=discord.colour.Color.blue())
emb.description = "help " \
"help.main_page.description".format(owner)
emb.set_author(icon_url=self.context.author.avatar_url,
name=self.context.author)
cogs = ""
for extension in self.context.bot.cogs.values():
if self.context.author != owner and extension.qualified_name.upper() in self.owner_cogs:
continue
if self.context.author == owner and extension.qualified_name in self.ignore_cogs:
continue
if extension.qualified_name == "Jishaku":
continue
cogs += f"{extension.icon} **{extension.qualified_name}**\n"
emb.add_field(name="help "
"help.main_page.field_title.categories",
value=cogs)
await self.context.send(embed=emb)
async def send_command_help(self, command):
if command.cog_name in self.ignore_cogs:
return await self.send_error_message(
self.command_not_found(command.name))
if isinstance(command, commandsPlus):
if command.name == "jishaku":
pass
formatted = self.common_command_formatting(
discord.Embed(color=discord.colour.Color.blue()), command)
await self.context.send(embed=formatted)
async def send_group_help(self, group):
if group.cog_name in self.ignore_cogs:
return await self.send_error_message(
self.command_not_found(group.name))
formatted = self.common_command_formatting(
discord.Embed(color=discord.colour.Color.blue()), group)
sub_cmd_list = ""
for group_command in group.commands:
try:
sub_cmd_list += f"`╚╡` **{group_command.name}** - " \
f"{group.cog.qualified_name.lower()}_help " \
f"{group_command.parent}_{group_command.name}_brief\n"
except Exception:
sub_cmd_list += f"`╚╡` **{group_command.name}** - " \
f"{group.cog.qualified_name.lower()}_help" \
f"{group_command.name}_brief\n"
subcommands = "help.command_help.subcommands"
formatted.add_field(name=subcommands, value=sub_cmd_list,
inline=False)
await self.context.send(embed=formatted)
async def send_cog_help(self, cog):
if (
cog.qualified_name.upper() in self.owner_cogs
and not await self.context.bot.is_owner(self.context.author)
) or cog.qualified_name.upper() in self.ignore_cogs:
return
if cog.qualified_name == "Jishaku":
return
if cog.qualified_name in self.ignore_cogs:
return
pages = {}
for cmd in cog.get_commands():
if not await self.context.bot.is_owner(
self.context.author) and (
cmd.hidden or cmd.category == "Hidden"):
continue
if cmd.category not in pages:
pages[cmd.category] = "```asciidoc\n"
cmd_brief = f"{cog.qualified_name.lower()}_help " \
f"{cmd.name}_brief"
pages[
cmd.category] += f"{cmd.name}{' ' * int(17 - len(cmd.name))}:: {cmd_brief}\n"
if isinstance(cmd, commands.Group):
for group_command in cmd.commands:
try:
cmd_brief = f"{cog.qualified_name.lower()}_help " \
f"{group_command.parent}_{group_command.name}_brief"
except Exception:
cmd_brief = f"{cog.qualified_name.lower()}_help " \
f"{group_command.name}_brief"
pages[
cmd.category] += f"{group_command.name}{' ' * int(15 - len(group_command.name))}:: {cmd_brief}\n"
for e in pages:
pages[e] += "```"
formatted = []
for name, cont in pages.items():
formatted.append((name, cont))
footer_text = "help " \
"help.category_page.footer_info".format(self.context.prefix)
pages = FieldPages(self.context,
embed_color=discord.colour.Color.blue(),
entries=formatted,
title=cog.qualified_name.upper(),
thumbnail=cog.big_icon,
footertext=footer_text,
per_page=1)
await pages.paginate()
def command_not_found(self, string):
return 'No command called "{}" found.'.format(string)
class Help(commands.Cog):
def __init__(self, bot: TuxBot):
bot.help_command = HelpCommand()
def setup(bot: TuxBot):
bot.add_cog(Help(bot))

View file

@ -19,8 +19,8 @@ import psutil
from discord.ext import commands, tasks
from bot import TuxBot
from .utils import Texts
from .utils.extra import commandExtra
from utils import Texts
from utils import commandExtra
log = logging.getLogger(__name__)

View file

@ -1,6 +1,6 @@
from datetime import datetime
import logging
import urllib.request
from datetime import datetime
import discord
from aiohttp import web
@ -30,11 +30,11 @@ class Monitoring(commands.Cog):
@tasks.loop(seconds=10.0)
async def ping_clusters(self):
for cluster in self.bot.clusters:
for cluster in self.bot.fallbacks:
if cluster == 'DEFAULT':
pass
else:
cluster = self.bot.clusters[cluster]
cluster = self.bot.fallbacks[cluster]
if not cluster.get('This', False):
host = cluster.get('Host')
port = cluster.get('Port')

View file

@ -7,10 +7,9 @@ from discord.ext import commands
from yarl import URL
from bot import TuxBot
from .utils.lang import Texts
from .utils.models import PollModel, ResponsesModel
from .utils.extra import groupExtra
from .utils import emotes as utils_emotes
from utils import PollModel, ResponsesModel
from utils import Texts, emotes as utils_emotes
from utils import groupExtra
log = logging.getLogger(__name__)
@ -19,6 +18,8 @@ class Polls(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
self.icon = ":bar_chart:"
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/120/twitter/233/bar-chart_1f4ca.png:"
def get_poll(self, pld) -> Union[bool, PollModel]:
if pld.user_id != self.bot.user.id:
@ -76,7 +77,8 @@ class Polls(commands.Cog):
await self.update_poll(poll.id)
@commands.Cog.listener()
async def on_raw_reaction_remove(self, pld: discord.RawReactionActionEvent):
async def on_raw_reaction_remove(self,
pld: discord.RawReactionActionEvent):
poll = self.get_poll(pld)
if poll:
@ -156,8 +158,8 @@ class Polls(commands.Cog):
content = json.loads(poll.content) \
if isinstance(poll.content, str) \
else poll.content
raw_responses = self.bot.database.session\
.query(ResponsesModel)\
raw_responses = self.bot.database.session \
.query(ResponsesModel) \
.filter(ResponsesModel.poll_id == poll_id)
responses = {}

View file

@ -1,26 +1,34 @@
# Created by romain at 04/01/2020
import logging
import os
import pathlib
import platform
import re
import socket
import time
from socket import AF_INET6
import aiohttp
import discord
import humanize
import psutil
from discord.ext import commands
from tcp_latency import measure_latency
from bot import TuxBot
from .utils.lang import Texts
from .utils.extra import commandExtra
from tcp_latency import measure_latency
from utils import Texts
from utils import commandExtra
log = logging.getLogger(__name__)
class Basics(commands.Cog):
class Useful(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
self.icon = ":toolbox:"
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/120/twitter/233/toolbox_1f9f0.png"
@staticmethod
def _latest_commits():
@ -30,6 +38,142 @@ class Basics(commands.Cog):
###########################################################################
@commandExtra(name='iplocalise', category='utility',
description=Texts('commands').get('utility._iplocalise'))
async def _iplocalise(self, ctx: commands.Context, addr, ip_type=''):
addr = re.sub(r'http(s?)://', '', addr)
addr = addr[:-1] if addr.endswith('/') else addr
await ctx.trigger_typing()
try:
if ip_type in ('v6', 'ipv6'):
try:
ip = socket.getaddrinfo(addr, None, AF_INET6)[1][4][0]
except socket.gaierror:
return await ctx.send(
Texts('utility', ctx).get('ipv6 not available'))
else:
ip = socket.gethostbyname(addr)
async with self.bot.session.get(f"http://ip-api.com/json/{ip}") \
as s:
response: dict = await s.json()
if response.get('status') == 'success':
e = discord.Embed(
title=f"{Texts('utility', ctx).get('Information for')}"
f" ``{addr}`` *`({response.get('query')})`*",
color=0x5858d7
)
e.add_field(
name=Texts('utility', ctx).get('Belongs to :'),
value=response.get('org', 'N/A'),
inline=False
)
e.add_field(
name=Texts('utility', ctx).get('Is located at :'),
value=response.get('city', 'N/A'),
inline=True
)
e.add_field(
name="Region :",
value=f"{response.get('regionName', 'N/A')} "
f"({response.get('country', 'N/A')})",
inline=True
)
e.set_thumbnail(
url=f"https://www.countryflags.io/"
f"{response.get('countryCode')}/flat/64.png")
await ctx.send(embed=e)
else:
await ctx.send(
content=f"{Texts('utility', ctx).get('info not available')}"
f"``{response.get('query')}``")
except Exception:
await ctx.send(
f"{Texts('utility', ctx).get('Cannot connect to host')} {addr}"
)
###########################################################################
@commandExtra(name='getheaders', category='utility',
description=Texts('commands').get('utility._getheaders'))
async def _getheaders(self, ctx: commands.Context, addr: str):
if (addr.startswith('http') or addr.startswith('ftp')) is not True:
addr = f"http://{addr}"
await ctx.trigger_typing()
try:
async with self.bot.session.get(addr) as s:
e = discord.Embed(
title=f"{Texts('utility', ctx).get('Headers of')} {addr}",
color=0xd75858
)
e.add_field(name="Status", value=s.status, inline=True)
e.set_thumbnail(url=f"https://http.cat/{s.status}")
headers = dict(s.headers.items())
headers.pop('Set-Cookie', headers)
for key, value in headers.items():
e.add_field(name=key, value=value, inline=True)
await ctx.send(embed=e)
except aiohttp.client_exceptions.ClientError:
await ctx.send(
f"{Texts('utility', ctx).get('Cannot connect to host')} {addr}"
)
###########################################################################
@commandExtra(name='git', aliases=['sources', 'source', 'github'],
category='utility',
description=Texts('commands').get('utility._git'))
async def _git(self, ctx):
e = discord.Embed(
title=Texts('utility', ctx).get('git repo'),
description=Texts('utility', ctx).get('git text'),
colour=0xE9D460
)
e.set_author(
name='Gnous',
icon_url="https://cdn.gnous.eu/logo1.png"
)
await ctx.send(embed=e)
###########################################################################
@commandExtra(name='quote', category='utility',
description=Texts('commands').get('utility._quote'))
async def _quote(self, ctx, message_id: discord.Message):
e = discord.Embed(
colour=message_id.author.colour,
description=message_id.clean_content,
timestamp=message_id.created_at
)
e.set_author(
name=message_id.author.display_name,
icon_url=message_id.author.avatar_url_as(format="jpg")
)
if len(message_id.attachments) >= 1:
e.set_image(url=message_id.attachments[0].url)
e.add_field(name="**Original**",
value=f"[Go!]({message_id.jump_url})")
e.set_footer(text="#" + message_id.channel.name)
await ctx.send(embed=e)
###########################################################################
@commandExtra(name='ping', category='basics',
description=Texts('commands').get('basics._ping'))
async def _ping(self, ctx: commands.Context):
@ -178,4 +322,4 @@ class Basics(commands.Cog):
def setup(bot: TuxBot):
bot.add_cog(Basics(bot))
bot.add_cog(Useful(bot))

View file

@ -3,9 +3,9 @@ import logging
from discord.ext import commands
from bot import TuxBot
from .utils.extra import groupExtra
from .utils.lang import Texts
from .utils.models import AliasesModel
from utils import AliasesModel
from utils import Texts
from utils import groupExtra
log = logging.getLogger(__name__)
@ -14,6 +14,8 @@ class User(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
self.icon = ":bust_in_silhouette:"
self.big_icon = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/120/twitter/233/bust-in-silhouette_1f464.png"
###########################################################################

View file

@ -1,160 +0,0 @@
import logging
import re
import aiohttp
import discord
from discord.ext import commands
from bot import TuxBot
import socket
from socket import AF_INET6
from .utils.lang import Texts
from .utils.extra import commandExtra
log = logging.getLogger(__name__)
class Utility(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
###########################################################################
@commandExtra(name='iplocalise', category='utility',
description=Texts('commands').get('utility._iplocalise'))
async def _iplocalise(self, ctx: commands.Context, addr, ip_type=''):
addr = re.sub(r'http(s?)://', '', addr)
addr = addr[:-1] if addr.endswith('/') else addr
await ctx.trigger_typing()
try:
if ip_type in ('v6', 'ipv6'):
try:
ip = socket.getaddrinfo(addr, None, AF_INET6)[1][4][0]
except socket.gaierror:
return await ctx.send(
Texts('utility', ctx).get('ipv6 not available'))
else:
ip = socket.gethostbyname(addr)
async with self.bot.session.get(f"http://ip-api.com/json/{ip}") \
as s:
response: dict = await s.json()
if response.get('status') == 'success':
e = discord.Embed(
title=f"{Texts('utility', ctx).get('Information for')}"
f" ``{addr}`` *`({response.get('query')})`*",
color=0x5858d7
)
e.add_field(
name=Texts('utility', ctx).get('Belongs to :'),
value=response.get('org', 'N/A'),
inline=False
)
e.add_field(
name=Texts('utility', ctx).get('Is located at :'),
value=response.get('city', 'N/A'),
inline=True
)
e.add_field(
name="Region :",
value=f"{response.get('regionName', 'N/A')} "
f"({response.get('country', 'N/A')})",
inline=True
)
e.set_thumbnail(
url=f"https://www.countryflags.io/"
f"{response.get('countryCode')}/flat/64.png")
await ctx.send(embed=e)
else:
await ctx.send(
content=f"{Texts('utility', ctx).get('info not available')}"
f"``{response.get('query')}``")
except Exception:
await ctx.send(
f"{Texts('utility', ctx).get('Cannot connect to host')} {addr}"
)
###########################################################################
@commandExtra(name='getheaders', category='utility',
description=Texts('commands').get('utility._getheaders'))
async def _getheaders(self, ctx: commands.Context, addr: str):
if (addr.startswith('http') or addr.startswith('ftp')) is not True:
addr = f"http://{addr}"
await ctx.trigger_typing()
try:
async with self.bot.session.get(addr) as s:
e = discord.Embed(
title=f"{Texts('utility', ctx).get('Headers of')} {addr}",
color=0xd75858
)
e.add_field(name="Status", value=s.status, inline=True)
e.set_thumbnail(url=f"https://http.cat/{s.status}")
headers = dict(s.headers.items())
headers.pop('Set-Cookie', headers)
for key, value in headers.items():
e.add_field(name=key, value=value, inline=True)
await ctx.send(embed=e)
except aiohttp.client_exceptions.ClientError:
await ctx.send(
f"{Texts('utility', ctx).get('Cannot connect to host')} {addr}"
)
###########################################################################
@commandExtra(name='git', aliases=['sources', 'source', 'github'],
category='utility',
description=Texts('commands').get('utility._git'))
async def _git(self, ctx):
e = discord.Embed(
title=Texts('utility', ctx).get('git repo'),
description=Texts('utility', ctx).get('git text'),
colour=0xE9D460
)
e.set_author(
name='Gnous',
icon_url="https://cdn.gnous.eu/logo1.png"
)
await ctx.send(embed=e)
###########################################################################
@commandExtra(name='quote', category='utility',
description=Texts('commands').get('utility._quote'))
async def _quote(self, ctx, message_id: discord.Message):
e = discord.Embed(
colour=message_id.author.colour,
description=message_id.clean_content,
timestamp=message_id.created_at
)
e.set_author(
name=message_id.author.display_name,
icon_url=message_id.author.avatar_url_as(format="jpg")
)
if len(message_id.attachments) >= 1:
e.set_image(url=message_id.attachments[0].url)
e.add_field(name="**Original**",
value=f"[Go!]({message_id.jump_url})")
e.set_footer(text="#" + message_id.channel.name)
await ctx.send(embed=e)
def setup(bot: TuxBot):
bot.add_cog(Utility(bot))

View file

@ -1,3 +0,0 @@
from .config import *
from .lang import *
from .version import *

View file

@ -1,36 +0,0 @@
from typing import List, Union
import configparser
class Config(configparser.ConfigParser):
__slots__ = ('name', '_db')
def __init__(self, name):
super().__init__()
self._db = super()
self._db.read(name)
def find(self, value: str, **kwargs) \
-> Union[
List[configparser.SectionProxy], configparser.SectionProxy
]:
key = kwargs.get('key', None)
first = kwargs.get('first', False)
results = []
for name, section in self._db.items():
if key is None:
for k in section.keys():
if section.get(k) == value:
results.append(section)
if first and len(results) == 1:
return results[0]
else:
if section.get(key) == value:
results.append(section)
if first and len(results) == 1:
return results[0]
return results

View file

@ -1,17 +0,0 @@
from .config import Config
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, session
class Database:
def __init__(self, config: Config):
conf_postgresql = config["postgresql"]
postgresql = 'postgresql://{}:{}@{}/{}'.format(
conf_postgresql.get("Username"), conf_postgresql.get("Password"),
conf_postgresql.get("Host"), conf_postgresql.get("DBName"))
self.engine = create_engine(postgresql, echo=False)
Session = sessionmaker()
Session.configure(bind=self.engine)
self.session: session = Session()

View file

@ -1,10 +0,0 @@
emotes = ['1⃣', '2⃣', '3⃣', '4⃣', '5⃣', '6⃣', '7⃣', '8⃣', '9⃣', '🔟', '0⃣',
'🇦', '🇧', '🇨', '🇩', '🇪', '🇫', '🇬', '🇭', '🇮']
def get(count):
return emotes[:count]
def get_index(emote):
return emotes.index(emote)

View file

@ -1,21 +0,0 @@
from discord.ext import commands
class commandsPlus(commands.Command):
def __init__(self, func, **kwargs):
super().__init__(func, **kwargs)
self.category = kwargs.get("category", 'other')
def commandExtra(*args, **kwargs):
return commands.command(*args, **kwargs, cls=commandsPlus)
class GroupPlus(commands.Group):
def __init__(self, func, **kwargs):
super().__init__(func, **kwargs)
self.category = kwargs.get("category", 'other')
def groupExtra(*args, **kwargs):
return commands.group(*args, **kwargs, cls=GroupPlus)

View file

@ -1,38 +0,0 @@
import gettext
from .config import Config
from cogs.utils.database import Database
from .models.lang import LangModel
from discord.ext import commands
class Texts:
def __init__(self, base: str = 'base', ctx: commands.Context = None):
self.locale = self.get_locale(ctx)
self.base = base
def get(self, text: str) -> str:
texts = gettext.translation(self.base, localedir='extras/locales',
languages=[self.locale])
texts.install()
return texts.gettext(text)
def set(self, lang: str):
self.locale = lang
@staticmethod
def get_locale(ctx):
database = Database(Config("./configs/config.cfg"))
if ctx is not None:
current = database.session\
.query(LangModel.value)\
.filter(LangModel.key == str(ctx.guild.id))
if current.count() > 0:
return current.one()[0]
default = database.session\
.query(LangModel.value)\
.filter(LangModel.key == 'default')\
.one()[0]
return default

View file

@ -1,7 +0,0 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from .lang import LangModel
from .warn import WarnModel
from .poll import PollModel, ResponsesModel
from .alias import AliasesModel

View file

@ -1,28 +0,0 @@
from sqlalchemy import Column, String, BigInteger, Integer
from . import Base
class AliasesModel(Base):
__tablename__ = 'aliases'
id = Column(Integer, primary_key=True)
user_id = Column(BigInteger)
alias = Column(String)
command = Column(String)
guild = Column(String)
def __repr__(self):
return "<AliasesModel(" \
"id='%s', " \
"user_id='%s', " \
"alias='%s', " \
"command='%s', " \
"guild='%s', " \
")>" % (
self.id,
self.user_id,
self.alias,
self.command,
self.guild
)

View file

@ -1,12 +0,0 @@
from . import Base
from sqlalchemy import Column, String
class LangModel(Base):
__tablename__ = 'langs'
key = Column(String, primary_key=True)
value = Column(String)
def __repr__(self):
return "<LangModel(key='%s', locale='%s')>" % (self.key, self.value)

View file

@ -1,27 +0,0 @@
from . import Base
from sqlalchemy import Column, Integer, BigInteger, JSON, ForeignKey, Boolean
from sqlalchemy.orm import relationship
class PollModel(Base):
__tablename__ = 'polls'
id = Column(Integer, primary_key=True, autoincrement=True)
channel_id = Column(BigInteger)
message_id = Column(BigInteger)
content = Column(JSON)
is_anonymous = Column(Boolean)
available_choices = Column(Integer)
choice = relationship("ResponsesModel")
class ResponsesModel(Base):
__tablename__ = 'responses'
id = Column(Integer, primary_key=True, autoincrement=True)
user = Column(BigInteger)
poll_id = Column(Integer, ForeignKey('polls.id'))
choice = Column(Integer)

View file

@ -1,19 +0,0 @@
import datetime
from . import Base
from sqlalchemy import Column, Integer, String, BIGINT, TIMESTAMP
class WarnModel(Base):
__tablename__ = 'warns'
id = Column(Integer, primary_key=True)
server_id = Column(BIGINT)
user_id = Column(BIGINT)
reason = Column(String)
created_at = Column(TIMESTAMP, default=datetime.datetime.now())
def __repr__(self):
return "<WarnModel(server_id='%s', user_id='%s', reason='%s', " \
"created_at='%s')>" \
% (self.server_id, self.user_id, self.reason, self.created_at)

View file

@ -1,316 +0,0 @@
import asyncio
import discord
from discord.ext.commands import Paginator as CommandPaginator
class CannotPaginate(Exception):
pass
class Pages:
"""Implements a paginator that queries the user for the
pagination interface.
Pages are 1-index based, not 0-index based.
If the user does not reply within 2 minutes then the pagination
interface exits automatically.
Parameters
------------
ctx: Context
The context of the command.
entries: List[str]
A list of entries to paginate.
per_page: int
How many entries show up per page.
show_entry_count: bool
Whether to show an entry count in the footer.
Attributes
-----------
embed: discord.Embed
The embed object that is being used to send pagination info.
Feel free to modify this externally. Only the description,
footer fields, and colour are internally modified.
permissions: discord.Permissions
Our permissions for the channel.
"""
def __init__(self, ctx, *, entries, per_page=12, show_entry_count=True):
self.bot = ctx.bot
self.entries = entries
self.message = ctx.message
self.channel = ctx.channel
self.author = ctx.author
self.per_page = per_page
pages, left_over = divmod(len(self.entries), self.per_page)
if left_over:
pages += 1
self.maximum_pages = pages
self.embed = discord.Embed(colour=discord.Colour.blurple())
self.paginating = len(entries) > per_page
self.show_entry_count = show_entry_count
self.reaction_emojis = [
('\N{BLACK LEFT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}',
self.first_page),
('\N{BLACK LEFT-POINTING TRIANGLE}', self.previous_page),
('\N{BLACK RIGHT-POINTING TRIANGLE}', self.next_page),
('\N{BLACK RIGHT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}',
self.last_page),
('\N{INPUT SYMBOL FOR NUMBERS}', self.numbered_page),
('\N{BLACK SQUARE FOR STOP}', self.stop_pages),
('\N{INFORMATION SOURCE}', self.show_help),
]
if ctx.guild is not None:
self.permissions = self.channel.permissions_for(ctx.guild.me)
else:
self.permissions = self.channel.permissions_for(ctx.bot.user)
if not self.permissions.embed_links:
raise CannotPaginate('Bot does not have embed links permission.')
if not self.permissions.send_messages:
raise CannotPaginate('Bot cannot send messages.')
if self.paginating:
# verify we can actually use the pagination session
if not self.permissions.add_reactions:
raise CannotPaginate(
'Bot does not have add reactions permission.')
if not self.permissions.read_message_history:
raise CannotPaginate(
'Bot does not have Read Message History permission.')
def get_page(self, page):
base = (page - 1) * self.per_page
return self.entries[base:base + self.per_page]
def get_content(self, entries, page, *, first=False):
return None
def get_embed(self, entries, page, *, first=False):
self.prepare_embed(entries, page, first=first)
return self.embed
def prepare_embed(self, entries, page, *, first=False):
p = []
for index, entry in enumerate(entries,
1 + ((page - 1) * self.per_page)):
p.append(f'{index}. {entry}')
if self.maximum_pages > 1:
if self.show_entry_count:
text = f'Page {page}/{self.maximum_pages} ({len(self.entries)} entries)'
else:
text = f'Page {page}/{self.maximum_pages}'
self.embed.set_footer(text=text)
if self.paginating and first:
p.append('')
p.append(
'Confused? React with \N{INFORMATION SOURCE} for more info.')
self.embed.description = '\n'.join(p)
async def show_page(self, page, *, first=False):
self.current_page = page
entries = self.get_page(page)
content = self.get_content(entries, page, first=first)
embed = self.get_embed(entries, page, first=first)
if not self.paginating:
return await self.channel.send(content=content, embed=embed)
if not first:
await self.message.edit(content=content, embed=embed)
return
self.message = await self.channel.send(content=content, embed=embed)
for (reaction, _) in self.reaction_emojis:
if self.maximum_pages == 2 and reaction in ('\u23ed', '\u23ee'):
# no |<< or >>| buttons if we only have two pages
# we can't forbid it if someone ends up using it but remove
# it from the default set
continue
await self.message.add_reaction(reaction)
async def checked_show_page(self, page):
if page != 0 and page <= self.maximum_pages:
await self.show_page(page)
async def first_page(self):
"""goes to the first page"""
await self.show_page(1)
async def last_page(self):
"""goes to the last page"""
await self.show_page(self.maximum_pages)
async def next_page(self):
"""goes to the next page"""
await self.checked_show_page(self.current_page + 1)
async def previous_page(self):
"""goes to the previous page"""
await self.checked_show_page(self.current_page - 1)
async def show_current_page(self):
if self.paginating:
await self.show_page(self.current_page)
async def numbered_page(self):
"""lets you type a page number to go to"""
to_delete = []
to_delete.append(
await self.channel.send('What page do you want to go to?'))
def message_check(m):
return m.author == self.author and \
self.channel == m.channel and \
m.content.isdigit()
try:
msg = await self.bot.wait_for('message', check=message_check,
timeout=30.0)
except asyncio.TimeoutError:
to_delete.append(await self.channel.send('Took too long.'))
await asyncio.sleep(5)
else:
page = int(msg.content)
to_delete.append(msg)
if page != 0 and page <= self.maximum_pages:
await self.show_page(page)
else:
to_delete.append(await self.channel.send(
f'Invalid page given. ({page}/{self.maximum_pages})'))
await asyncio.sleep(5)
try:
await self.channel.delete_messages(to_delete)
except Exception:
pass
async def show_help(self):
"""shows this message"""
messages = ['Welcome to the interactive paginator!\n']
messages.append(
'This interactively allows you to see pages of text by navigating with ' \
'reactions. They are as follows:\n')
for (emoji, func) in self.reaction_emojis:
messages.append(f'{emoji} {func.__doc__}')
embed = self.embed.copy()
embed.clear_fields()
embed.description = '\n'.join(messages)
embed.set_footer(
text=f'We were on page {self.current_page} before this message.')
await self.message.edit(content=None, embed=embed)
async def go_back_to_current_page():
await asyncio.sleep(60.0)
await self.show_current_page()
self.bot.loop.create_task(go_back_to_current_page())
async def stop_pages(self):
"""stops the interactive pagination session"""
await self.message.delete()
self.paginating = False
def react_check(self, payload):
if payload.user_id != self.author.id:
return False
if payload.message_id != self.message.id:
return False
to_check = str(payload.emoji)
for (emoji, func) in self.reaction_emojis:
if to_check == emoji:
self.match = func
return True
return False
async def paginate(self):
"""Actually paginate the entries and run the interactive loop if necessary."""
first_page = self.show_page(1, first=True)
if not self.paginating:
await first_page
else:
# allow us to react to reactions right away if we're paginating
self.bot.loop.create_task(first_page)
while self.paginating:
try:
payload = await self.bot.wait_for('raw_reaction_add',
check=self.react_check,
timeout=120.0)
except asyncio.TimeoutError:
self.paginating = False
try:
await self.message.clear_reactions()
except:
pass
finally:
break
try:
await self.message.remove_reaction(payload.emoji,
discord.Object(
id=payload.user_id))
except:
pass # can't remove it so don't bother doing so
await self.match()
class FieldPages(Pages):
"""Similar to Pages except entries should be a list of
tuples having (key, value) to show as embed fields instead.
"""
def prepare_embed(self, entries, page, *, first=False):
self.embed.clear_fields()
self.embed.description = discord.Embed.Empty
for key, value in entries:
self.embed.add_field(name=key, value=value, inline=False)
if self.maximum_pages > 1:
if self.show_entry_count:
text = f'Page {page}/{self.maximum_pages} ({len(self.entries)} entries)'
else:
text = f'Page {page}/{self.maximum_pages}'
self.embed.set_footer(text=text)
class TextPages(Pages):
"""Uses a commands.Paginator internally to paginate some text."""
def __init__(self, ctx, text, *, prefix='```', suffix='```',
max_size=2000):
paginator = CommandPaginator(prefix=prefix, suffix=suffix,
max_size=max_size - 200)
for line in text.split('\n'):
paginator.add_line(line)
super().__init__(ctx, entries=paginator.pages, per_page=1,
show_entry_count=False)
def get_page(self, page):
return self.entries[page - 1]
def get_embed(self, entries, page, *, first=False):
return None
def get_content(self, entry, page, *, first=False):
if self.maximum_pages > 1:
return f'{entry}\nPage {page}/{self.maximum_pages}'
return entry

View file

@ -1,12 +0,0 @@
class Version:
def __init__(self, major: int, minor: int, patch: int, **kwargs):
self.major: int = major
self.minor: int = minor
self.patch: int = patch
self.pre_release = kwargs.get('pre_release', '')
self.build = kwargs.get('build', '')
def __str__(self) -> str:
build = self.build[:10]
return f'v{self.major}.{self.minor}.{self.patch}{self.pre_release}+{build}'