fix(gitignore): add packages to gitignore

This commit is contained in:
Romain J 2020-06-04 00:17:36 +02:00
parent 335397554f
commit 33fa6b7f1f
25 changed files with 30 additions and 1538 deletions

8
.gitignore vendored
View file

@ -7,6 +7,8 @@ __pycache__/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
venv/
settings.py
venv
dist
build
*.egg
*.egg-info

View file

@ -2,16 +2,31 @@
<project version="4">
<component name="ChangeListManager">
<list default="true" id="c97c8a30-7573-4dcd-a0d4-5bf94b8ddbbd" name="5ed57ed9960f35191182a924 core" comment="">
<change beforePath="$PROJECT_DIR$/.idea/tuxbot-bot-rewrite.iml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/tuxbot-bot-rewrite.iml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.gitignore" beforeDir="false" afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/README.md" beforeDir="false" afterPath="$PROJECT_DIR$/README.md" afterDir="false" />
<change beforePath="$PROJECT_DIR$/setup.cfg" beforeDir="false" afterPath="$PROJECT_DIR$/setup.cfg" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/__init__.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/__init__.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/__main__.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/__main__.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/cogs/network/network.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/cogs/network/network.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/core/bot.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/core/bot.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/logging.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/logging.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/setup.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/setup.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Tuxbot_bot.egg-info/PKG-INFO" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/Tuxbot_bot.egg-info/SOURCES.txt" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/Tuxbot_bot.egg-info/dependency_links.txt" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/Tuxbot_bot.egg-info/entry_points.txt" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/Tuxbot_bot.egg-info/requires.txt" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/Tuxbot_bot.egg-info/top_level.txt" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/__init__.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/__main__.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/cogs/images/__init__.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/cogs/images/images.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/cogs/logs/__init__.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/cogs/logs/logs.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/cogs/network/__init__.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/cogs/network/network.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/core/__init__.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/core/bot.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/core/config.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/core/data_manager.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/core/utils/functions/cli.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/core/utils/functions/extra.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/logging.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/build/lib/tuxbot/setup.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/dist/Tuxbot_bot-3.0.0-py3.8.egg" beforeDir="false" />
</list>
<list id="a3abf5c0-7587-46e4-8f09-88e34a1ab8a4" name="5ed41911b012e33f68a07e7a i18n" comment="" />
<list id="6566fca1-2e90-48bb-9e74-dd3badbaca99" name="Default Changelist" comment="" />
@ -122,7 +137,7 @@
<workItem from="1591049956280" duration="4910000" />
<workItem from="1591054878071" duration="1039000" />
<workItem from="1591088657371" duration="4107000" />
<workItem from="1591128560850" duration="38137000" />
<workItem from="1591128560850" duration="38413000" />
</task>
<option name="localTasksCounter" value="2" />
<option name="createBranch" value="false" />

View file

@ -1,11 +0,0 @@
Metadata-Version: 1.2
Name: Tuxbot-bot
Version: 3.0.0
Summary: A bot made for GnousEU and OpenSource
Home-page: https://git.gnous.eu/gnouseu/tuxbot-bot/
Author: Romain J.
Author-email: romain@gnous.eu
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
Requires-Python: >=3.7

View file

@ -1,25 +0,0 @@
README.md
setup.cfg
setup.py
Tuxbot_bot.egg-info/PKG-INFO
Tuxbot_bot.egg-info/SOURCES.txt
Tuxbot_bot.egg-info/dependency_links.txt
Tuxbot_bot.egg-info/entry_points.txt
Tuxbot_bot.egg-info/requires.txt
Tuxbot_bot.egg-info/top_level.txt
tuxbot/__init__.py
tuxbot/__main__.py
tuxbot/logging.py
tuxbot/setup.py
tuxbot/cogs/images/__init__.py
tuxbot/cogs/images/images.py
tuxbot/cogs/logs/__init__.py
tuxbot/cogs/logs/logs.py
tuxbot/cogs/network/__init__.py
tuxbot/cogs/network/network.py
tuxbot/core/__init__.py
tuxbot/core/bot.py
tuxbot/core/config.py
tuxbot/core/data_manager.py
tuxbot/core/utils/functions/cli.py
tuxbot/core/utils/functions/extra.py

View file

@ -1 +0,0 @@

View file

@ -1,4 +0,0 @@
[console_scripts]
tuxbot = tuxbot.__main__:main
tuxbot-setup = tuxbot.setup:setup

View file

@ -1,29 +0,0 @@
aiohttp==3.6.2
aiosqlite==0.13.0
async-timeout==3.0.1
asyncpg==0.20.1
attrs==19.3.0
cachetools==4.1.0
certifi==2020.4.5.1
chardet==3.0.4
ciso8601==2.1.3
colorama==0.4.3
discord-flags==2.1.1
discord.py==1.3.3
dnspython==1.16.0
humanize==2.4.0
idna==2.9
ipinfo==3.0.0
ipwhois==1.1.0
iso8601==0.1.12
multidict==4.7.6
psutil==5.7.0
PyPika==0.37.7
pytz==2020.1
requests==2.23.0
six==1.15.0
tortoise-orm==0.16.13
typing-extensions==3.7.4.2
urllib3==1.25.9
websockets==8.1
yarl==1.4.2

View file

@ -1 +0,0 @@
tuxbot

View file

@ -1,14 +0,0 @@
import subprocess
from collections import namedtuple
build = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']) \
.decode()
VersionInfo = namedtuple('VersionInfo', 'major minor micro releaselevel build')
version_info = VersionInfo(
major=3, minor=0, micro=0,
releaselevel='alpha', build=build
)
__version__ = "v{}.{}.{}" \
.format(version_info.major, version_info.minor, version_info.micro)

View file

@ -1,139 +0,0 @@
import argparse
import asyncio
import logging
import signal
import sys
import discord
from colorama import Fore, init, Style
import tuxbot.logging
from tuxbot.core import data_manager
from tuxbot.core.bot import Tux
log = logging.getLogger("tuxbot.main")
init()
def parse_cli_flags(args):
parser = argparse.ArgumentParser(
description="Tuxbot - OpenSource bot",
usage="tuxbot <instance_name> [arguments]"
)
parser.add_argument("--version", "-V", help="Show tuxbot's used version")
parser.add_argument("--list-instances", "-L",
help="List all instance names")
parser.add_argument(
"instance_name", nargs="?",
help="Name of the bot instance created during `redbot-setup`.")
args = parser.parse_args(args)
if args.prefix:
args.prefix = sorted(args.prefix, reverse=True)
else:
args.prefix = []
return args
async def shutdown_handler(tux, signal_type, exit_code=None):
if signal_type:
log.info("%s received. Quitting...", signal_type)
sys.exit(0)
elif exit_code is None:
log.info("Shutting down from unhandled exception")
tux.shutdown_code = 1
if exit_code is not None:
tux.shutdown_code = exit_code
try:
await tux.logout()
finally:
pending = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
async def run_bot(tux: Tux, cli_flags: argparse.Namespace) -> None:
data_path = data_manager.get_data_path(tux.instance_name)
tuxbot.logging.init_logging(
level=cli_flags.logging_level,
location=data_path / "logs"
)
log.debug("====Basic Config====")
log.debug("Data Path: %s", data_path)
if cli_flags.token:
token = cli_flags.token
else:
token = await tux.config.token()
if not token:
log.critical("Token must be set if you want to login.")
sys.exit(1)
try:
await tux.start(token, bot=True, cli_flags=cli_flags)
except discord.LoginFailure:
log.critical("This token appears to be valid.")
sys.exit(1)
return None
def main():
tux = None
cli_flags = parse_cli_flags(sys.argv[1:])
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if cli_flags.no_instance:
print(Fore.RED
+ "No instance provided ! "
"You can use 'tuxbot -L' to list all available instances"
+ Style.RESET_ALL)
sys.exit(1)
tux = Tux(
cli_flags=cli_flags,
description="Tuxbot, made from and for OpenSource",
dm_help=None
)
loop.run_until_complete(run_bot(tux, cli_flags))
except KeyboardInterrupt:
log.warning("Please use <prefix>quit instead of Ctrl+C to Shutdown!")
log.error("Received KeyboardInterrupt")
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, signal.SIGINT))
except SystemExit as exc:
log.info("Shutting down with exit code: %s", exc.code)
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, exc.code))
except Exception as exc:
log.exception("Unexpected exception (%s): ", type(exc), exc_info=exc)
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, 1))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
log.info("Please wait, cleaning up a bit more")
loop.run_until_complete(asyncio.sleep(1))
asyncio.set_event_loop(None)
loop.stop()
loop.close()
exit_code = 1 if tux is None else tux.shutdown_code
sys.exit(exit_code)
if __name__ == "__main__":
main()

View file

@ -1,5 +0,0 @@
from .images import Images
def setup(bot):
bot.add_cog(Images(bot))

View file

@ -1,175 +0,0 @@
import logging
from io import BytesIO
import discord
from discord.ext import commands, flags
from app import TuxBot
from utils.functions.extra import ContextPlus, command_extra
log = logging.getLogger(__name__)
class Images(commands.Cog, name="Images"):
def __init__(self, bot):
self.bot = bot
self.image_api = "http://0.0.0.0:8080"
async def _send_meme(self, ctx: ContextPlus, endpoint: str, **passed_flags):
async with ctx.typing():
url = f"{self.image_api}/{endpoint}?"
for key, val in passed_flags.items():
if val:
url += f"{key}={val}&"
async with self.bot.session.get(url) as r:
if r.status != 200:
return await ctx.send("Failed...")
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
@command_extra(name="phcomment")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _phcomment(self, ctx: ContextPlus, user: discord.User = None, *, message: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
async with ctx.typing():
message = message.replace("&", "%26")
if user is None:
avatar = ctx.author.avatar_url_as(format='png')
username = ctx.author.name
else:
avatar = user.avatar_url_as(format='png')
username = user.name
url = f"{self.image_api}/ph/comment" \
f"?image={avatar}" \
f"&username={username}" \
f"&message={message}"
async with self.bot.session.get(url) as r:
if r.status != 200:
return await ctx.send("Failed...")
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
@command_extra(name="phvideo")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _phvideo(self, ctx: ContextPlus, image: str, author: discord.User, *, title: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
async with ctx.typing():
url = f"{self.image_api}/ph/video" \
f"?image={image}" \
f"&username={author.name}" \
f"&title={title}"
async with self.bot.session.get(url) as r:
if r.status != 200:
return await ctx.send("Failed...")
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="balloon")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _balloon(self, ctx: ContextPlus, **passed_flags):
passed_flags["text3"] = passed_flags.get("text3")
passed_flags["text4"] = passed_flags.get("text1")
passed_flags["text5"] = passed_flags.get("text2")
await self._send_meme(ctx, 'balloon', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="butterfly")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _butterfly(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'butterfly', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="buttons")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _buttons(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'buttons', **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="cmm")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _cmm(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'change_my_mind', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="drake")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _drake(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'drake', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="fry")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _fry(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'fry', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="imagination")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _imagination(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'imagination', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="everywhere")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _everywhere(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'everywhere', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="choice")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _choice(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'choice', **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="pika")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _pika(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'pika', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="pkp")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _pkp(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'pkp', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="puppet")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _puppet(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'puppet', **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="scroll_of_truth", alias=['sot'])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _sot(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'scroll_of_truth', **passed_flags)

View file

@ -1,14 +0,0 @@
import logging
from discord.ext import commands
from .logs import Logs, GatewayHandler, on_error
def setup(bot):
cog = Logs(bot)
bot.add_cog(cog)
handler = GatewayHandler(cog)
logging.getLogger().addHandler(handler)
commands.AutoShardedBot.on_error = on_error

View file

@ -1,323 +0,0 @@
"""
Based on https://github.com/Rapptz/RoboDanny/blob/3d94e89ef27f702a5f57f432a9131bdfb60bb3ec/cogs/stats.py
Adapted by Romain J.
"""
import asyncio
import datetime
import json
import logging
import textwrap
import traceback
from collections import defaultdict
import discord
import humanize
import psutil
from discord.ext import commands, tasks
from app import TuxBot
from utils.functions.extra import command_extra
log = logging.getLogger(__name__)
class GatewayHandler(logging.Handler):
def __init__(self, cog):
self.cog = cog
super().__init__(logging.INFO)
def filter(self, record):
return record.name == 'discord.gateway' \
or 'Shard ID' in record.msg \
or 'Websocket closed ' in record.msg
def emit(self, record):
self.cog.add_record(record)
class Logs(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
self.process = psutil.Process()
self._batch_lock = asyncio.Lock(loop=bot.loop)
self._data_batch = []
self._gateway_queue = asyncio.Queue(loop=bot.loop)
self.gateway_worker.start()
self._resumes = []
self._identifies = defaultdict(list)
def _clear_gateway_data(self):
one_week_ago = datetime.datetime.utcnow() - datetime.timedelta(days=7)
to_remove = [
index for index, dt in enumerate(self._resumes)
if dt < one_week_ago
]
for index in reversed(to_remove):
del self._resumes[index]
for shard_id, dates in self._identifies.items():
to_remove = [index for index, dt in enumerate(dates) if
dt < one_week_ago]
for index in reversed(to_remove):
del dates[index]
@tasks.loop(seconds=0.0)
async def gateway_worker(self):
record = await self._gateway_queue.get()
await self.notify_gateway_status(record)
async def register_command(self, ctx):
if ctx.command is None:
return
command = ctx.command.qualified_name
self.bot.command_stats[command] += 1
message = ctx.message
if ctx.guild is None:
destination = 'Private Message'
guild_id = None
else:
destination = f'#{message.channel} ({message.guild})'
guild_id = ctx.guild.id
log.info(
f'{message.created_at}: {message.author} '
f'in {destination}: {message.content}')
async with self._batch_lock:
self._data_batch.append({
'guild': guild_id,
'channel': ctx.channel.id,
'author': ctx.author.id,
'used': message.created_at.isoformat(),
'prefix': ctx.prefix,
'command': command,
'failed': ctx.command_failed,
})
@commands.Cog.listener()
async def on_command_completion(self, ctx):
await self.register_command(ctx)
@commands.Cog.listener()
async def on_socket_response(self, msg):
self.bot.socket_stats[msg.get('t')] += 1
@property
def logs(self):
webhooks = {}
for key, value in self.bot.logs_channels.items():
webhooks[key] = discord.Webhook.partial(
id=value.get('webhook')['id'],
token=value.get('webhook')['token'],
adapter=discord.AsyncWebhookAdapter(
self.bot.session
)
)
return webhooks
async def log_error(self, *, ctx=None, extra=None):
e = discord.Embed(title='Error', colour=0xdd5f53)
e.description = f'```py\n{traceback.format_exc()}\n```'
e.add_field(name='Extra', value=extra, inline=False)
e.timestamp = datetime.datetime.utcnow()
if ctx is not None:
fmt = '{0} (ID: {0.id})'
author = fmt.format(ctx.author)
channel = fmt.format(ctx.channel)
guild = 'None' if ctx.guild is None else fmt.format(ctx.guild)
e.add_field(name='Author', value=author)
e.add_field(name='Channel', value=channel)
e.add_field(name='Guild', value=guild)
await self.logs.get('errors').send(embed=e)
async def send_guild_stats(self, e, guild):
e.add_field(name='Name', value=guild.name)
e.add_field(name='ID', value=guild.id)
e.add_field(name='Shard ID', value=guild.shard_id or 'N/A')
e.add_field(name='Owner',
value=f'{guild.owner} (ID: {guild.owner.id})')
bots = sum(member.bot for member in guild.members)
total = guild.member_count
online = sum(member.status is discord.Status.online
for member in guild.members)
e.add_field(name='Members', value=str(total))
e.add_field(name='Bots', value=f'{bots} ({bots / total:.2%})')
e.add_field(name='Online', value=f'{online} ({online / total:.2%})')
if guild.icon:
e.set_thumbnail(url=guild.icon_url)
if guild.me:
e.timestamp = guild.me.joined_at
await self.logs.get('guilds').send(embed=e)
@commands.Cog.listener()
async def on_guild_join(self, guild: discord.guild):
e = discord.Embed(colour=0x53dda4, title='New Guild') # green colour
await self.send_guild_stats(e, guild)
@commands.Cog.listener()
async def on_guild_remove(self, guild: discord.guild):
e = discord.Embed(colour=0xdd5f53, title='Left Guild') # red colour
await self.send_guild_stats(e, guild)
@commands.Cog.listener()
async def on_message(self, message: discord.message):
ctx = await self.bot.get_context(message)
if ctx.valid:
return
if isinstance(message.channel, discord.DMChannel):
if message.author is self.bot.user:
e = discord.Embed(
title=f"DM to: {message.channel.recipient}",
description=message.content,
color=0x39e326
)
else:
e = discord.Embed(
title="New DM:",
description=message.content,
color=0x0A97F5
)
e.set_author(
name=message.channel.recipient,
icon_url=message.channel.recipient.avatar_url_as(format="png")
)
if message.attachments:
attachment_url = message.attachments[0].url
e.set_image(url=attachment_url)
e.set_footer(
text=f"User ID: {message.channel.recipient.id}"
)
await self.logs["dm"].send(embed=e)
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
await self.register_command(ctx)
if not isinstance(error, (
commands.CommandInvokeError, commands.ConversionError)):
return
error = error.original
if isinstance(error, (discord.Forbidden, discord.NotFound)):
return
e = discord.Embed(title='Command Error', colour=0xcc3366)
e.add_field(name='Name', value=ctx.command.qualified_name)
e.add_field(name='Author', value=f'{ctx.author} (ID: {ctx.author.id})')
fmt = f'Channel: {ctx.channel} (ID: {ctx.channel.id})'
if ctx.guild:
fmt = f'{fmt}\nGuild: {ctx.guild} (ID: {ctx.guild.id})'
e.add_field(name='Location', value=fmt, inline=False)
e.add_field(name='Content', value=textwrap.shorten(
ctx.message.content,
width=512
))
exc = ''.join(traceback.format_exception(
type(error), error, error.__traceback__,
chain=False)
)
e.description = f'```py\n{exc}\n```'
e.timestamp = datetime.datetime.utcnow()
await self.logs.get('errors').send(embed=e)
@commands.Cog.listener()
async def on_socket_raw_send(self, data):
if '"op":2' not in data and '"op":6' not in data:
return
back_to_json = json.loads(data)
if back_to_json['op'] == 2:
payload = back_to_json['d']
inner_shard = payload.get('shard', [0])
self._identifies[inner_shard[0]].append(datetime.datetime.utcnow())
else:
self._resumes.append(datetime.datetime.utcnow())
self._clear_gateway_data()
def add_record(self, record):
self._gateway_queue.put_nowait(record)
async def notify_gateway_status(self, record):
types = {
'INFO': ':information_source:',
'WARNING': ':warning:'
}
emoji = types.get(record.levelname, ':heavy_multiplication_x:')
dt = datetime.datetime.utcfromtimestamp(record.created)
msg = f'{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {record.message}`'
await self.logs.get('gateway').send(msg)
@command_extra(name='commandstats')
@commands.is_owner()
async def _commandstats(self, ctx, limit=20):
counter = self.bot.command_stats
width = len(max(counter, key=len))
if limit > 0:
common = counter.most_common(limit)
else:
common = counter.most_common()[limit:]
output = '\n'.join(f'{k:<{width}}: {c}' for k, c in common)
await ctx.send(f'```\n{output}\n```')
@commands.command('socketstats')
@commands.is_owner()
async def _socketstats(self, ctx):
delta = datetime.datetime.utcnow() - self.bot.uptime
minutes = delta.total_seconds() / 60
total = sum(self.bot.socket_stats.values())
cpm = total / minutes
await ctx.send(
f'{total} socket events observed ({cpm:.2f}/minute):\n'
f'{self.bot.socket_stats}')
@commands.command('uptime')
async def _uptime(self, ctx):
uptime = humanize.naturaltime(
datetime.datetime.utcnow() - self.bot.uptime)
await ctx.send(f'Uptime: **{uptime}**')
async def on_error(self, event, *args):
e = discord.Embed(title='Event Error', colour=0xa32952)
e.add_field(name='Event', value=event)
e.description = f'```py\n{traceback.format_exc()}\n```'
e.timestamp = datetime.datetime.utcnow()
args_str = ['```py']
for index, arg in enumerate(args):
args_str.append(f'[{index}]: {arg!r}')
args_str.append('```')
e.add_field(name='Args', value='\n'.join(args_str), inline=False)
hook = self.get_cog('Logs').logs.get('errors')
try:
await hook.send(embed=e)
except (discord.HTTPException, discord.NotFound,
discord.Forbidden, discord.InvalidArgument):
pass

View file

@ -1,5 +0,0 @@
from .network import Network
def setup(bot):
bot.add_cog(Network(bot))

View file

@ -1,105 +0,0 @@
import logging
import socket
import ipinfo
import discord
from discord.ext import commands, flags
from ipwhois import Net
from ipwhois.asn import IPASN
from ipinfo.exceptions import RequestQuotaExceededError
from requests.exceptions import HTTPError
from app import TuxBot
from utils.functions.extra import ContextPlus, command_extra
log = logging.getLogger(__name__)
class Network(commands.Cog, name="Useless"):
def __init__(self, bot: TuxBot):
self.bot = bot
@flags.add_flag("-i", "--ip", type=str, default='v4',
choices=['v4', '4', 'v6', '6'])
@command_extra(name="iplocalise", aliases=['localiseip'])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _iplocalise(self, ctx: ContextPlus, target: str, **passed_flags):
loading = await ctx.send(
"_Récupération des informations..._", deletable=False
)
def get_hostname(dtl, tgt):
try:
return dtl.hostname
except AttributeError:
try:
return socket.gethostbyaddr(tgt)[0]
except (ValueError, socket.herror):
return 'N/A'
ip_type = passed_flags.get('ip')
target_copy = target
# clean https://, last /, ...
spltTgt = target.split("://")
target = spltTgt[
(0, 1)[len(spltTgt) > 1]
].split("?")[0].split('/')[0].split(':')[0].lower()
try:
target = socket.getaddrinfo(
target, None,
socket.AF_INET if ip_type in ['v4', '4'] else socket.AF_INET6
)[1][4][0]
except socket.gaierror:
return \
await ctx.send("Erreur, cette adresse n'est pas disponible.")
net = Net(target)
obj = IPASN(net)
ip_info = obj.lookup()
try:
handler = ipinfo.getHandler(self.bot.config.ipinfo)
details = handler.getDetails(target)
api_result = True
except (RequestQuotaExceededError, HTTPError):
details = None
api_result = False
if api_result:
belongs = f"{details.org}"
osm = f"https://www.openstreetmap.org/" \
f"?mlat={details.latitude}" \
f"&mlon={details.longitude}" \
f"#map=5/{details.latitude}/{details.longitude}" \
f"&layers=H"
region = f"[{details.city} - {details.region} " \
f"({details.country})]({osm})"
flag = f"https://www.countryflags.io/" \
f"{details.country}/shiny/64.png"
else:
belongs = f"{ip_info['asn_description']} (AS{ip_info['asn']})"
region = f"{ip_info['asn_country_code']}"
flag = f"https://www.countryflags.io/" \
f"{ip_info['asn_country_code']}/shiny/64.png"
e = discord.Embed(
title=f"**Information sur __{target_copy}__ :**"
f" `{target}`",
color=0x5858d7
)
e.add_field(name="Appartient à :", value=belongs)
e.add_field(name="RIR :", value=f"{ip_info['asn_registry']}")
e.add_field(name="Region :", value=region)
e.add_field(name="Nom de l'hôte :",
value=get_hostname(details, target), inline=False)
e.set_thumbnail(url=flag)
await loading.delete()
await ctx.send(embed=e)

View file

@ -1,8 +0,0 @@
from colorama import init
from .. import __version__, version_info, VersionInfo
from .config import Config
__all__ = ["Config", "__version__", "version_info", "VersionInfo"]
init()

View file

@ -1,65 +0,0 @@
from pathlib import Path
from discord.ext import commands
from . import Config
from . import data_manager
__all__ = ["Tux"]
class Tux(commands.AutoShardedBot):
def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs):
# by default, if the bot shutdown without any intervention,
# it's a crash
self.shutdown_code = 1
self.cli_flags = cli_flags
self.instance_name = self.cli_flags.instance_name
self.last_exception = None
self.config = Config(
data_manager.get_data_path(self.instance_name)
)
self.config.register_global(
token=None,
prefix=[],
owner=None,
whitelist=[],
blacklist=[],
locale="en-US",
embeds=True,
color=0x6E83D1,
disabled_commands=[]
)
self.config.register_guild(
prefix=[],
whitelist=[],
blacklist=[],
locale="en-US",
admin_role=[],
mod_role=[],
embeds=None,
ignored=False,
disabled_commands=[]
)
self.config.register_channel(
ignored=False
)
if "owner_ids" in kwargs:
kwargs["owner_ids"] = set(kwargs["owner_ids"])
else:
kwargs["owner_ids"] = self.config.owner_ids()
message_cache_size = 100_000
kwargs["max_messages"] = message_cache_size
self.max_messages = message_cache_size
self.uptime = None
self.main_dir = bot_dir
print(str(self.cli_flags), self.instance_name, self.config, self.owner_ids, self.main_dir)
exit()
super().__init__(*args, help_command=None, **kwargs)

View file

@ -1,41 +0,0 @@
from pathlib import Path
from typing import Any, NoReturn
class Config:
GLOBAL = "GLOBAL"
GUILD = "GUILD"
CHANNEL = "TEXT_CHANNEL"
ROLE = "ROLE"
MEMBER = "MEMBER"
USER = "USER"
def __init__(self, config_dir: Path):
self._defaults = {}
def __getattr__(self, item: str) -> dict:
return getattr(self._defaults, item)
def _register_default(self, key: str, **kwargs: Any):
...
def register_core(self, **kwargs) -> NoReturn:
self._register_default(self.GUILD, **kwargs)
def register_global(self, **kwargs) -> NoReturn:
self._register_default(self.GLOBAL, **kwargs)
def register_guild(self, **kwargs) -> NoReturn:
self._register_default(self.GUILD, **kwargs)
def register_channel(self, **kwargs) -> NoReturn:
self._register_default(self.CHANNEL, **kwargs)
def register_role(self, **kwargs) -> NoReturn:
self._register_default(self.ROLE, **kwargs)
def register_member(self, **kwargs) -> NoReturn:
self._register_default(self.MEMBER, **kwargs)
def register_user(self, **kwargs) -> NoReturn:
self._register_default(self.USER, **kwargs)

View file

@ -1,26 +0,0 @@
from pathlib import Path
import appdirs
app_dir = appdirs.AppDirs("Tuxbot-bot")
config_dir = Path(app_dir.user_config_dir)
config_file = config_dir / "config.json"
def get_data_path(instance_name: str) -> Path:
return Path(app_dir.user_data_dir) / "data" / instance_name
def get_core_path(instance_name: str) -> Path:
data_path = get_data_path(instance_name)
return data_path / "data" / instance_name / "core"
def get_cogs_path(instance_name: str) -> Path:
data_path = get_data_path(instance_name)
return data_path / "data" / instance_name / "cogs"
def get_cog_path(instance_name: str, cog_name: str) -> Path:
data_path = get_data_path(instance_name)
return data_path / "data" / instance_name / "cogs" / cog_name

View file

@ -1,89 +0,0 @@
import codecs
import itertools
import sys
def bordered(*columns: dict) -> str:
"""
credits to https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/develop/redbot/core/utils/chat_formatting.py
Get two blocks of text in a borders.
Note
----
This will only work with a monospaced font.
Parameters
----------
*columns : `sequence` of `str`
The columns of text, each being a list of lines in that column.
Returns
-------
str
The bordered text.
"""
encoder = codecs.getencoder(sys.stdout.encoding)
try:
encoder("┌┐└┘─│") # border symbols
except UnicodeEncodeError:
ascii_border = True
else:
ascii_border = False
borders = {
"TL": "+" if ascii_border else "", # Top-left
"TR": "+" if ascii_border else "", # Top-right
"BL": "+" if ascii_border else "", # Bottom-left
"BR": "+" if ascii_border else "", # Bottom-right
"HZ": "-" if ascii_border else "", # Horizontal
"VT": "|" if ascii_border else "", # Vertical
}
sep = " " * 4 # Separator between boxes
widths = tuple(
max(
len(row) for row in column.get('rows')
) + 9
for column in columns
) # width of each col
cols_done = [False] * len(columns) # whether or not each column is done
lines = [""]
for i, column in enumerate(columns):
lines[0] += "{TL}" + "{HZ}" + column.get('title') \
+ "{HZ}" * (widths[i] - len(column.get('title')) - 1) \
+ "{TR}" + sep
for line in itertools.zip_longest(
*[column.get('rows') for column in columns]
):
row = []
for colidx, column in enumerate(line):
width = widths[colidx]
done = cols_done[colidx]
if column is None:
if not done:
# bottom border of column
column = "{HZ}" * width
row.append("{BL}" + column + "{BR}")
cols_done[colidx] = True # mark column as done
else:
# leave empty
row.append(" " * (width + 2))
else:
column += " " * (width - len(column)) # append padded spaces
row.append("{VT}" + column + "{VT}")
lines.append(sep.join(row))
final_row = []
for width, done in zip(widths, cols_done):
if not done:
final_row.append("{BL}" + "{HZ}" * width + "{BR}")
else:
final_row.append(" " * (width + 2))
lines.append(sep.join(final_row))
return "\n".join(lines).format(**borders)

View file

@ -1,114 +0,0 @@
import ast
import asyncio
import json
import os
import discord
from discord.ext import commands, flags
from configs.bot.protected import protected
from configs.bot.settings import prefixes
class ContextPlus(commands.Context):
async def send(self, content=None, *args, **kwargs):
if content is not None:
for value in protected:
content = content.replace(
str(value),
'[Deleted]'
)
if kwargs.get('content') is not None:
for value in protected:
kwargs['content'] = kwargs['content'].replace(
str(value),
'[Deleted]'
)
if kwargs.get('embeds') is not None and len(kwargs.get('embeds')) > 0:
for i, embed in enumerate(kwargs.get('embeds')):
embed = str(kwargs.get('embed').to_dict())
for value in protected:
embed = embed.replace(str(value), '[Deleted]')
kwargs['embeds'][i] = discord.Embed.from_dict(
ast.literal_eval(embed)
)
if kwargs.get('embed') is not None:
embed = str(kwargs.get('embed').to_dict())
for value in protected:
embed = embed.replace(str(value), '[Deleted]')
kwargs['embed'] = discord.Embed.from_dict(
ast.literal_eval(embed)
)
if (hasattr(self.command, 'deletable') and self.command.deletable) \
and kwargs.pop('deletable', True):
message = await super().send(content, *args, **kwargs)
await message.add_reaction('🗑')
def check(reaction: discord.Reaction, user: discord.User):
return user == self.author \
and str(reaction.emoji) == '🗑' \
and reaction.message.id == message.id
try:
await self.bot.wait_for(
'reaction_add',
timeout=60.0,
check=check
)
except asyncio.TimeoutError:
await message.remove_reaction('🗑', self.bot.user)
else:
await message.delete()
return message
else:
return await super().send(content, *args, **kwargs)
class CommandPLus(flags.FlagCommand):
def __init__(self, function, **kwargs):
super().__init__(function, **kwargs)
self.deletable = kwargs.pop("deletable", True)
def command_extra(*args, **kwargs):
return commands.command(*args, **kwargs, cls=CommandPLus)
class GroupPlus(flags.FlagGroup):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.deletable = kwargs.pop("deletable", True)
def group_extra(*args, **kwargs):
return commands.group(*args, **kwargs, cls=GroupPlus)
async def get_prefix(bot, message):
custom_prefix = prefixes
if message.guild:
path = f"configs/guilds/{str(message.guild.id)}.json"
if os.path.exists(path):
with open(path) as f:
datas = json.load(f)
custom_prefix = datas["Prefix"]
return commands.when_mentioned_or(*custom_prefix)(bot, message)
def get_owners() -> list:
with open("configs/bot/whitelist.json") as f:
datas = json.load(f)
return datas['owners']
def get_blacklist() -> dict:
with open("configs/bot/blacklist.json") as f:
return json.load(f)

View file

@ -1,39 +0,0 @@
import logging.handlers
import logging
import pathlib
import sys
MAX_OLD_LOGS = 8
MAX_BYTES = 5_000_000
def init_logging(level: int, location: pathlib.Path) -> None:
dpy_logger = logging.getLogger("discord")
dpy_logger.setLevel(logging.WARN)
dpy_logger_file = location / 'discord.log'
base_logger = logging.getLogger("tuxbot")
base_logger.setLevel(level)
base_logger_file = location / 'tuxbot.log'
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}",
datefmt="%Y-%m-%d %H:%M:%S", style="{"
)
dpy_handler = logging.handlers.RotatingFileHandler(
str(dpy_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
base_handler = logging.handlers.RotatingFileHandler(
str(base_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
dpy_logger.addHandler(dpy_handler)
base_logger.addHandler(base_handler)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
base_logger.addHandler(stdout_handler)
dpy_logger.addHandler(stdout_handler)

View file

@ -1,292 +0,0 @@
import json
import logging
import re
import sys
from pathlib import Path
from typing import NoReturn, Union, List, Set
import click
from colorama import Fore, Style, init
from tuxbot.core.data_manager import config_dir, app_dir
init()
try:
config_dir.mkdir(parents=True, exist_ok=True)
except PermissionError:
print(f"mkdir: cannot create directory '{config_dir}': Permission denied")
sys.exit(1)
config_file = config_dir / "config.json"
def load_existing_config() -> dict:
if not config_file.exists():
return {}
with config_file.open() as fs:
return json.load(fs)
instances_data = load_existing_config()
if not instances_data:
instances_list = []
else:
instances_list = list(instances_data.keys())
def save_config(name, data, delete=False):
_config = load_existing_config()
if delete and name in _config:
_config.pop(name)
else:
_config[name] = data
with config_file.open("w") as fs:
json.dump(_config, fs, indent=4)
def get_name() -> str:
name = ""
while not name:
print(
"What name do you want to give this instance?\n"
"(valid characters: A-Z, a-z, 0-9, _, -)"
)
name = input("> ")
if re.fullmatch(r"[a-zA-Z0-9_\-]*", name) is None:
print()
print(
Fore.RED
+ "ERROR: Invalid characters provided"
+ Style.RESET_ALL
)
name = ""
return name
def get_data_dir(instance_name: str) -> Path:
data_path = Path(app_dir.user_data_dir) / "data" / instance_name
data_path_input = ""
print()
def make_data_dir(path: Path) -> Union[Path, str]:
try:
path.mkdir(parents=True, exist_ok=True)
except OSError:
print()
print(
Fore.RED
+ f"mkdir: cannot create directory '{path}':"
f" Permission denied"
+ Style.RESET_ALL
)
path = ""
return path
while not data_path_input:
print(
"where do you want to save the configurations?\n"
"Press [enter] to keep the default path"
)
print()
print(f"Default: {data_path}")
data_path_input = input("> ")
if data_path_input != '':
data_path_input = Path(data_path_input)
try:
exists = data_path_input.exists()
except OSError:
print()
print(
Fore.RED
+ "Impossible to verify the validity of the path, "
"make sure it does not contain any invalid characters."
+ Style.RESET_ALL
)
data_path_input = ""
exists = False
if data_path_input and not exists:
data_path_input = make_data_dir(data_path_input)
else:
data_path_input = make_data_dir(data_path)
print()
print(
f"You have chosen {data_path_input} to be your config directory for "
f"`{instance_name}` instance"
)
if not click.confirm("Please confirm", default=True):
print("Rerun the process to redo this configuration.")
sys.exit(0)
(data_path_input / 'core').mkdir(parents=True, exist_ok=True)
(data_path_input / 'cogs').mkdir(parents=True, exist_ok=True)
(data_path_input / 'logs').mkdir(parents=True, exist_ok=True)
return data_path_input
def get_token() -> str:
token = ""
while not token:
print(
"Please enter the bot token\n"
"(you can find it at https://discord.com/developers/applications)"
)
token = input("> ")
if re.fullmatch(r"([a-zA-Z0-9]{24}\.[a-zA-Z0-9_]{6}\.[a-zA-Z0-9_\-]{27}|mfa\.[a-zA-Z0-9_\-]{84})", token) is None:
print(
Fore.RED
+ "ERROR: Invalid token provided"
+ Style.RESET_ALL
)
token = ""
return token
def get_multiple(question: str, confirmation: str, value_type: type)\
-> Set[Union[str, int]]:
print(question)
values = [value_type(input('> '))]
while click.confirm(confirmation, default=False):
values.append(value_type(input('> ')))
return set(values)
def additional_config() -> dict:
p = Path(r'tuxbot/cogs').glob('**/additional_config.json')
datas = {}
for file in p:
print()
cog_name = str(file.parent).split('/')[-1]
datas[cog_name] = {}
with file.open('r') as f:
data = json.load(f)
print(f"\n==Configuration for `{cog_name}` module==")
for key, value in data.items():
print()
print(value['description'])
datas[cog_name][key] = input('> ')
return datas
def finish_setup(data_dir: Path) -> NoReturn:
print("Now, it's time to finish this setup by giving bot informations\n")
token = get_token()
print()
prefixes = get_multiple(
"Choice a (or multiple) prefix for the bot",
"Add another prefix ?",
str
)
mentionable = click.confirm(
"Does the bot answer if it's mentioned?",
default=True
)
owners_id = get_multiple(
"Give the owner id of this bot",
"Add another owner ?",
int
)
cogs_config = additional_config()
core_file = data_dir / 'core' / 'settings.json'
core = {
'token': token,
'prefixes': prefixes,
'mentionable': mentionable,
'owners_id': owners_id,
}
with core_file.open("w") as fs:
json.dump(core, fs, indent=4)
for cog, data in cogs_config.items():
data_cog_dir = data_dir / 'cogs' / cog
data_cog_dir.mkdir(parents=True, exist_ok=True)
data_cog_file = data_cog_dir / 'settings.json'
with data_cog_file.open("w") as fs:
json.dump(data, fs, indent=4)
def basic_setup() -> NoReturn:
print("Hi ! it's time for you to give me informations about you instance")
name = get_name()
data_dir = get_data_dir(name)
configs = load_existing_config()
instance_config = configs[name] if name in instances_list else {}
instance_config["DATA_PATH"] = str(data_dir.resolve())
instance_config["IS_RUNNING"] = False
if name in instances_list:
print()
print(
Fore.RED
+ f"WARNING: An instance named `{name}` already exists "
f"Continuing will overwrite this instance configs."
+ Style.RESET_ALL
)
if not click.confirm("Are you sure you want to continue?",
default=False):
print("Abandon...")
sys.exit(0)
save_config(name, instance_config)
print("\n"*4)
finish_setup(data_dir)
print()
print(
f"Instance successfully created! "
f"You can now run `tuxbot {name}` to launch this instance"
)
def setup():
try:
"""Create a new instance."""
level = logging.DEBUG
base_logger = logging.getLogger("tuxbot")
base_logger.setLevel(level)
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}",
datefmt="%Y-%m-%d %H:%M:%S", style="{"
)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
base_logger.addHandler(stdout_handler)
basic_setup()
except KeyboardInterrupt:
print("Exiting...")
if __name__ == "__main__":
setup()

Binary file not shown.