feat(botCore|launcher): add features to the core launcher

This commit is contained in:
Romain J 2020-06-04 00:14:50 +02:00
parent cbe250f137
commit 335397554f
33 changed files with 1680 additions and 174 deletions

View file

@ -2,6 +2,8 @@
<module type="PYTHON_MODULE" version="4"> <module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$"> <content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/build" />
<excludeFolder url="file://$MODULE_DIR$/dist" />
<excludeFolder url="file://$MODULE_DIR$/venv" /> <excludeFolder url="file://$MODULE_DIR$/venv" />
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />

View file

@ -2,10 +2,15 @@
<project version="4"> <project version="4">
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="c97c8a30-7573-4dcd-a0d4-5bf94b8ddbbd" name="5ed57ed9960f35191182a924 core" comment=""> <list default="true" id="c97c8a30-7573-4dcd-a0d4-5bf94b8ddbbd" name="5ed57ed9960f35191182a924 core" comment="">
<change beforePath="$PROJECT_DIR$/.idea/tuxbot-bot-rewrite.iml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/tuxbot-bot-rewrite.iml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/README.md" beforeDir="false" afterPath="$PROJECT_DIR$/README.md" afterDir="false" />
<change beforePath="$PROJECT_DIR$/setup.cfg" beforeDir="false" afterPath="$PROJECT_DIR$/setup.cfg" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/__init__.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/__init__.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/__main__.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/__main__.py" afterDir="false" /> <change beforePath="$PROJECT_DIR$/tuxbot/__main__.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/__main__.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/cogs/network/network.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/cogs/network/network.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/core/bot.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/core/bot.py" afterDir="false" /> <change beforePath="$PROJECT_DIR$/tuxbot/core/bot.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/core/bot.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/core/models/__init__.py" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/tuxbot/logging.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/logging.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tuxbot/setup.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/setup.py" afterDir="false" /> <change beforePath="$PROJECT_DIR$/tuxbot/setup.py" beforeDir="false" afterPath="$PROJECT_DIR$/tuxbot/setup.py" afterDir="false" />
</list> </list>
<list id="a3abf5c0-7587-46e4-8f09-88e34a1ab8a4" name="5ed41911b012e33f68a07e7a i18n" comment="" /> <list id="a3abf5c0-7587-46e4-8f09-88e34a1ab8a4" name="5ed41911b012e33f68a07e7a i18n" comment="" />
@ -117,7 +122,7 @@
<workItem from="1591049956280" duration="4910000" /> <workItem from="1591049956280" duration="4910000" />
<workItem from="1591054878071" duration="1039000" /> <workItem from="1591054878071" duration="1039000" />
<workItem from="1591088657371" duration="4107000" /> <workItem from="1591088657371" duration="4107000" />
<workItem from="1591128560850" duration="29106000" /> <workItem from="1591128560850" duration="38137000" />
</task> </task>
<option name="localTasksCounter" value="2" /> <option name="localTasksCounter" value="2" />
<option name="createBranch" value="false" /> <option name="createBranch" value="false" />

View file

@ -52,7 +52,7 @@ source ~/tuxvenv/bin/activate
Now, you can finish the installation by executing this single command: Now, you can finish the installation by executing this single command:
```shell script ```shell script
python setup.py install pip install .
``` ```
## Configuration ## Configuration

View file

@ -0,0 +1,11 @@
Metadata-Version: 1.2
Name: Tuxbot-bot
Version: 3.0.0
Summary: A bot made for GnousEU and OpenSource
Home-page: https://git.gnous.eu/gnouseu/tuxbot-bot/
Author: Romain J.
Author-email: romain@gnous.eu
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
Requires-Python: >=3.7

View file

@ -0,0 +1,25 @@
README.md
setup.cfg
setup.py
Tuxbot_bot.egg-info/PKG-INFO
Tuxbot_bot.egg-info/SOURCES.txt
Tuxbot_bot.egg-info/dependency_links.txt
Tuxbot_bot.egg-info/entry_points.txt
Tuxbot_bot.egg-info/requires.txt
Tuxbot_bot.egg-info/top_level.txt
tuxbot/__init__.py
tuxbot/__main__.py
tuxbot/logging.py
tuxbot/setup.py
tuxbot/cogs/images/__init__.py
tuxbot/cogs/images/images.py
tuxbot/cogs/logs/__init__.py
tuxbot/cogs/logs/logs.py
tuxbot/cogs/network/__init__.py
tuxbot/cogs/network/network.py
tuxbot/core/__init__.py
tuxbot/core/bot.py
tuxbot/core/config.py
tuxbot/core/data_manager.py
tuxbot/core/utils/functions/cli.py
tuxbot/core/utils/functions/extra.py

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,4 @@
[console_scripts]
tuxbot = tuxbot.__main__:main
tuxbot-setup = tuxbot.setup:setup

View file

@ -0,0 +1,29 @@
aiohttp==3.6.2
aiosqlite==0.13.0
async-timeout==3.0.1
asyncpg==0.20.1
attrs==19.3.0
cachetools==4.1.0
certifi==2020.4.5.1
chardet==3.0.4
ciso8601==2.1.3
colorama==0.4.3
discord-flags==2.1.1
discord.py==1.3.3
dnspython==1.16.0
humanize==2.4.0
idna==2.9
ipinfo==3.0.0
ipwhois==1.1.0
iso8601==0.1.12
multidict==4.7.6
psutil==5.7.0
PyPika==0.37.7
pytz==2020.1
requests==2.23.0
six==1.15.0
tortoise-orm==0.16.13
typing-extensions==3.7.4.2
urllib3==1.25.9
websockets==8.1
yarl==1.4.2

View file

@ -0,0 +1 @@
tuxbot

View file

@ -0,0 +1,14 @@
import subprocess
from collections import namedtuple
build = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']) \
.decode()
VersionInfo = namedtuple('VersionInfo', 'major minor micro releaselevel build')
version_info = VersionInfo(
major=3, minor=0, micro=0,
releaselevel='alpha', build=build
)
__version__ = "v{}.{}.{}" \
.format(version_info.major, version_info.minor, version_info.micro)

View file

@ -0,0 +1,139 @@
import argparse
import asyncio
import logging
import signal
import sys
import discord
from colorama import Fore, init, Style
import tuxbot.logging
from tuxbot.core import data_manager
from tuxbot.core.bot import Tux
log = logging.getLogger("tuxbot.main")
init()
def parse_cli_flags(args):
parser = argparse.ArgumentParser(
description="Tuxbot - OpenSource bot",
usage="tuxbot <instance_name> [arguments]"
)
parser.add_argument("--version", "-V", help="Show tuxbot's used version")
parser.add_argument("--list-instances", "-L",
help="List all instance names")
parser.add_argument(
"instance_name", nargs="?",
help="Name of the bot instance created during `redbot-setup`.")
args = parser.parse_args(args)
if args.prefix:
args.prefix = sorted(args.prefix, reverse=True)
else:
args.prefix = []
return args
async def shutdown_handler(tux, signal_type, exit_code=None):
if signal_type:
log.info("%s received. Quitting...", signal_type)
sys.exit(0)
elif exit_code is None:
log.info("Shutting down from unhandled exception")
tux.shutdown_code = 1
if exit_code is not None:
tux.shutdown_code = exit_code
try:
await tux.logout()
finally:
pending = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
async def run_bot(tux: Tux, cli_flags: argparse.Namespace) -> None:
data_path = data_manager.get_data_path(tux.instance_name)
tuxbot.logging.init_logging(
level=cli_flags.logging_level,
location=data_path / "logs"
)
log.debug("====Basic Config====")
log.debug("Data Path: %s", data_path)
if cli_flags.token:
token = cli_flags.token
else:
token = await tux.config.token()
if not token:
log.critical("Token must be set if you want to login.")
sys.exit(1)
try:
await tux.start(token, bot=True, cli_flags=cli_flags)
except discord.LoginFailure:
log.critical("This token appears to be valid.")
sys.exit(1)
return None
def main():
tux = None
cli_flags = parse_cli_flags(sys.argv[1:])
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if cli_flags.no_instance:
print(Fore.RED
+ "No instance provided ! "
"You can use 'tuxbot -L' to list all available instances"
+ Style.RESET_ALL)
sys.exit(1)
tux = Tux(
cli_flags=cli_flags,
description="Tuxbot, made from and for OpenSource",
dm_help=None
)
loop.run_until_complete(run_bot(tux, cli_flags))
except KeyboardInterrupt:
log.warning("Please use <prefix>quit instead of Ctrl+C to Shutdown!")
log.error("Received KeyboardInterrupt")
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, signal.SIGINT))
except SystemExit as exc:
log.info("Shutting down with exit code: %s", exc.code)
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, exc.code))
except Exception as exc:
log.exception("Unexpected exception (%s): ", type(exc), exc_info=exc)
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, 1))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
log.info("Please wait, cleaning up a bit more")
loop.run_until_complete(asyncio.sleep(1))
asyncio.set_event_loop(None)
loop.stop()
loop.close()
exit_code = 1 if tux is None else tux.shutdown_code
sys.exit(exit_code)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,5 @@
from .images import Images
def setup(bot):
bot.add_cog(Images(bot))

View file

@ -0,0 +1,175 @@
import logging
from io import BytesIO
import discord
from discord.ext import commands, flags
from app import TuxBot
from utils.functions.extra import ContextPlus, command_extra
log = logging.getLogger(__name__)
class Images(commands.Cog, name="Images"):
def __init__(self, bot):
self.bot = bot
self.image_api = "http://0.0.0.0:8080"
async def _send_meme(self, ctx: ContextPlus, endpoint: str, **passed_flags):
async with ctx.typing():
url = f"{self.image_api}/{endpoint}?"
for key, val in passed_flags.items():
if val:
url += f"{key}={val}&"
async with self.bot.session.get(url) as r:
if r.status != 200:
return await ctx.send("Failed...")
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
@command_extra(name="phcomment")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _phcomment(self, ctx: ContextPlus, user: discord.User = None, *, message: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
async with ctx.typing():
message = message.replace("&", "%26")
if user is None:
avatar = ctx.author.avatar_url_as(format='png')
username = ctx.author.name
else:
avatar = user.avatar_url_as(format='png')
username = user.name
url = f"{self.image_api}/ph/comment" \
f"?image={avatar}" \
f"&username={username}" \
f"&message={message}"
async with self.bot.session.get(url) as r:
if r.status != 200:
return await ctx.send("Failed...")
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
@command_extra(name="phvideo")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _phvideo(self, ctx: ContextPlus, image: str, author: discord.User, *, title: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
async with ctx.typing():
url = f"{self.image_api}/ph/video" \
f"?image={image}" \
f"&username={author.name}" \
f"&title={title}"
async with self.bot.session.get(url) as r:
if r.status != 200:
return await ctx.send("Failed...")
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="balloon")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _balloon(self, ctx: ContextPlus, **passed_flags):
passed_flags["text3"] = passed_flags.get("text3")
passed_flags["text4"] = passed_flags.get("text1")
passed_flags["text5"] = passed_flags.get("text2")
await self._send_meme(ctx, 'balloon', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="butterfly")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _butterfly(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'butterfly', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="buttons")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _buttons(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'buttons', **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="cmm")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _cmm(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'change_my_mind', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="drake")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _drake(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'drake', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="fry")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _fry(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'fry', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="imagination")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _imagination(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'imagination', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="everywhere")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _everywhere(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'everywhere', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="choice")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _choice(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'choice', **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="pika")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _pika(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'pika', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@flags.add_flag("--text3", type=str)
@command_extra(name="pkp")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _pkp(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'pkp', **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="puppet")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _puppet(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'puppet', **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="scroll_of_truth", alias=['sot'])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _sot(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'scroll_of_truth', **passed_flags)

View file

@ -0,0 +1,14 @@
import logging
from discord.ext import commands
from .logs import Logs, GatewayHandler, on_error
def setup(bot):
cog = Logs(bot)
bot.add_cog(cog)
handler = GatewayHandler(cog)
logging.getLogger().addHandler(handler)
commands.AutoShardedBot.on_error = on_error

View file

@ -0,0 +1,323 @@
"""
Based on https://github.com/Rapptz/RoboDanny/blob/3d94e89ef27f702a5f57f432a9131bdfb60bb3ec/cogs/stats.py
Adapted by Romain J.
"""
import asyncio
import datetime
import json
import logging
import textwrap
import traceback
from collections import defaultdict
import discord
import humanize
import psutil
from discord.ext import commands, tasks
from app import TuxBot
from utils.functions.extra import command_extra
log = logging.getLogger(__name__)
class GatewayHandler(logging.Handler):
def __init__(self, cog):
self.cog = cog
super().__init__(logging.INFO)
def filter(self, record):
return record.name == 'discord.gateway' \
or 'Shard ID' in record.msg \
or 'Websocket closed ' in record.msg
def emit(self, record):
self.cog.add_record(record)
class Logs(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
self.process = psutil.Process()
self._batch_lock = asyncio.Lock(loop=bot.loop)
self._data_batch = []
self._gateway_queue = asyncio.Queue(loop=bot.loop)
self.gateway_worker.start()
self._resumes = []
self._identifies = defaultdict(list)
def _clear_gateway_data(self):
one_week_ago = datetime.datetime.utcnow() - datetime.timedelta(days=7)
to_remove = [
index for index, dt in enumerate(self._resumes)
if dt < one_week_ago
]
for index in reversed(to_remove):
del self._resumes[index]
for shard_id, dates in self._identifies.items():
to_remove = [index for index, dt in enumerate(dates) if
dt < one_week_ago]
for index in reversed(to_remove):
del dates[index]
@tasks.loop(seconds=0.0)
async def gateway_worker(self):
record = await self._gateway_queue.get()
await self.notify_gateway_status(record)
async def register_command(self, ctx):
if ctx.command is None:
return
command = ctx.command.qualified_name
self.bot.command_stats[command] += 1
message = ctx.message
if ctx.guild is None:
destination = 'Private Message'
guild_id = None
else:
destination = f'#{message.channel} ({message.guild})'
guild_id = ctx.guild.id
log.info(
f'{message.created_at}: {message.author} '
f'in {destination}: {message.content}')
async with self._batch_lock:
self._data_batch.append({
'guild': guild_id,
'channel': ctx.channel.id,
'author': ctx.author.id,
'used': message.created_at.isoformat(),
'prefix': ctx.prefix,
'command': command,
'failed': ctx.command_failed,
})
@commands.Cog.listener()
async def on_command_completion(self, ctx):
await self.register_command(ctx)
@commands.Cog.listener()
async def on_socket_response(self, msg):
self.bot.socket_stats[msg.get('t')] += 1
@property
def logs(self):
webhooks = {}
for key, value in self.bot.logs_channels.items():
webhooks[key] = discord.Webhook.partial(
id=value.get('webhook')['id'],
token=value.get('webhook')['token'],
adapter=discord.AsyncWebhookAdapter(
self.bot.session
)
)
return webhooks
async def log_error(self, *, ctx=None, extra=None):
e = discord.Embed(title='Error', colour=0xdd5f53)
e.description = f'```py\n{traceback.format_exc()}\n```'
e.add_field(name='Extra', value=extra, inline=False)
e.timestamp = datetime.datetime.utcnow()
if ctx is not None:
fmt = '{0} (ID: {0.id})'
author = fmt.format(ctx.author)
channel = fmt.format(ctx.channel)
guild = 'None' if ctx.guild is None else fmt.format(ctx.guild)
e.add_field(name='Author', value=author)
e.add_field(name='Channel', value=channel)
e.add_field(name='Guild', value=guild)
await self.logs.get('errors').send(embed=e)
async def send_guild_stats(self, e, guild):
e.add_field(name='Name', value=guild.name)
e.add_field(name='ID', value=guild.id)
e.add_field(name='Shard ID', value=guild.shard_id or 'N/A')
e.add_field(name='Owner',
value=f'{guild.owner} (ID: {guild.owner.id})')
bots = sum(member.bot for member in guild.members)
total = guild.member_count
online = sum(member.status is discord.Status.online
for member in guild.members)
e.add_field(name='Members', value=str(total))
e.add_field(name='Bots', value=f'{bots} ({bots / total:.2%})')
e.add_field(name='Online', value=f'{online} ({online / total:.2%})')
if guild.icon:
e.set_thumbnail(url=guild.icon_url)
if guild.me:
e.timestamp = guild.me.joined_at
await self.logs.get('guilds').send(embed=e)
@commands.Cog.listener()
async def on_guild_join(self, guild: discord.guild):
e = discord.Embed(colour=0x53dda4, title='New Guild') # green colour
await self.send_guild_stats(e, guild)
@commands.Cog.listener()
async def on_guild_remove(self, guild: discord.guild):
e = discord.Embed(colour=0xdd5f53, title='Left Guild') # red colour
await self.send_guild_stats(e, guild)
@commands.Cog.listener()
async def on_message(self, message: discord.message):
ctx = await self.bot.get_context(message)
if ctx.valid:
return
if isinstance(message.channel, discord.DMChannel):
if message.author is self.bot.user:
e = discord.Embed(
title=f"DM to: {message.channel.recipient}",
description=message.content,
color=0x39e326
)
else:
e = discord.Embed(
title="New DM:",
description=message.content,
color=0x0A97F5
)
e.set_author(
name=message.channel.recipient,
icon_url=message.channel.recipient.avatar_url_as(format="png")
)
if message.attachments:
attachment_url = message.attachments[0].url
e.set_image(url=attachment_url)
e.set_footer(
text=f"User ID: {message.channel.recipient.id}"
)
await self.logs["dm"].send(embed=e)
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
await self.register_command(ctx)
if not isinstance(error, (
commands.CommandInvokeError, commands.ConversionError)):
return
error = error.original
if isinstance(error, (discord.Forbidden, discord.NotFound)):
return
e = discord.Embed(title='Command Error', colour=0xcc3366)
e.add_field(name='Name', value=ctx.command.qualified_name)
e.add_field(name='Author', value=f'{ctx.author} (ID: {ctx.author.id})')
fmt = f'Channel: {ctx.channel} (ID: {ctx.channel.id})'
if ctx.guild:
fmt = f'{fmt}\nGuild: {ctx.guild} (ID: {ctx.guild.id})'
e.add_field(name='Location', value=fmt, inline=False)
e.add_field(name='Content', value=textwrap.shorten(
ctx.message.content,
width=512
))
exc = ''.join(traceback.format_exception(
type(error), error, error.__traceback__,
chain=False)
)
e.description = f'```py\n{exc}\n```'
e.timestamp = datetime.datetime.utcnow()
await self.logs.get('errors').send(embed=e)
@commands.Cog.listener()
async def on_socket_raw_send(self, data):
if '"op":2' not in data and '"op":6' not in data:
return
back_to_json = json.loads(data)
if back_to_json['op'] == 2:
payload = back_to_json['d']
inner_shard = payload.get('shard', [0])
self._identifies[inner_shard[0]].append(datetime.datetime.utcnow())
else:
self._resumes.append(datetime.datetime.utcnow())
self._clear_gateway_data()
def add_record(self, record):
self._gateway_queue.put_nowait(record)
async def notify_gateway_status(self, record):
types = {
'INFO': ':information_source:',
'WARNING': ':warning:'
}
emoji = types.get(record.levelname, ':heavy_multiplication_x:')
dt = datetime.datetime.utcfromtimestamp(record.created)
msg = f'{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {record.message}`'
await self.logs.get('gateway').send(msg)
@command_extra(name='commandstats')
@commands.is_owner()
async def _commandstats(self, ctx, limit=20):
counter = self.bot.command_stats
width = len(max(counter, key=len))
if limit > 0:
common = counter.most_common(limit)
else:
common = counter.most_common()[limit:]
output = '\n'.join(f'{k:<{width}}: {c}' for k, c in common)
await ctx.send(f'```\n{output}\n```')
@commands.command('socketstats')
@commands.is_owner()
async def _socketstats(self, ctx):
delta = datetime.datetime.utcnow() - self.bot.uptime
minutes = delta.total_seconds() / 60
total = sum(self.bot.socket_stats.values())
cpm = total / minutes
await ctx.send(
f'{total} socket events observed ({cpm:.2f}/minute):\n'
f'{self.bot.socket_stats}')
@commands.command('uptime')
async def _uptime(self, ctx):
uptime = humanize.naturaltime(
datetime.datetime.utcnow() - self.bot.uptime)
await ctx.send(f'Uptime: **{uptime}**')
async def on_error(self, event, *args):
e = discord.Embed(title='Event Error', colour=0xa32952)
e.add_field(name='Event', value=event)
e.description = f'```py\n{traceback.format_exc()}\n```'
e.timestamp = datetime.datetime.utcnow()
args_str = ['```py']
for index, arg in enumerate(args):
args_str.append(f'[{index}]: {arg!r}')
args_str.append('```')
e.add_field(name='Args', value='\n'.join(args_str), inline=False)
hook = self.get_cog('Logs').logs.get('errors')
try:
await hook.send(embed=e)
except (discord.HTTPException, discord.NotFound,
discord.Forbidden, discord.InvalidArgument):
pass

View file

@ -0,0 +1,5 @@
from .network import Network
def setup(bot):
bot.add_cog(Network(bot))

View file

@ -0,0 +1,105 @@
import logging
import socket
import ipinfo
import discord
from discord.ext import commands, flags
from ipwhois import Net
from ipwhois.asn import IPASN
from ipinfo.exceptions import RequestQuotaExceededError
from requests.exceptions import HTTPError
from app import TuxBot
from utils.functions.extra import ContextPlus, command_extra
log = logging.getLogger(__name__)
class Network(commands.Cog, name="Useless"):
def __init__(self, bot: TuxBot):
self.bot = bot
@flags.add_flag("-i", "--ip", type=str, default='v4',
choices=['v4', '4', 'v6', '6'])
@command_extra(name="iplocalise", aliases=['localiseip'])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _iplocalise(self, ctx: ContextPlus, target: str, **passed_flags):
loading = await ctx.send(
"_Récupération des informations..._", deletable=False
)
def get_hostname(dtl, tgt):
try:
return dtl.hostname
except AttributeError:
try:
return socket.gethostbyaddr(tgt)[0]
except (ValueError, socket.herror):
return 'N/A'
ip_type = passed_flags.get('ip')
target_copy = target
# clean https://, last /, ...
spltTgt = target.split("://")
target = spltTgt[
(0, 1)[len(spltTgt) > 1]
].split("?")[0].split('/')[0].split(':')[0].lower()
try:
target = socket.getaddrinfo(
target, None,
socket.AF_INET if ip_type in ['v4', '4'] else socket.AF_INET6
)[1][4][0]
except socket.gaierror:
return \
await ctx.send("Erreur, cette adresse n'est pas disponible.")
net = Net(target)
obj = IPASN(net)
ip_info = obj.lookup()
try:
handler = ipinfo.getHandler(self.bot.config.ipinfo)
details = handler.getDetails(target)
api_result = True
except (RequestQuotaExceededError, HTTPError):
details = None
api_result = False
if api_result:
belongs = f"{details.org}"
osm = f"https://www.openstreetmap.org/" \
f"?mlat={details.latitude}" \
f"&mlon={details.longitude}" \
f"#map=5/{details.latitude}/{details.longitude}" \
f"&layers=H"
region = f"[{details.city} - {details.region} " \
f"({details.country})]({osm})"
flag = f"https://www.countryflags.io/" \
f"{details.country}/shiny/64.png"
else:
belongs = f"{ip_info['asn_description']} (AS{ip_info['asn']})"
region = f"{ip_info['asn_country_code']}"
flag = f"https://www.countryflags.io/" \
f"{ip_info['asn_country_code']}/shiny/64.png"
e = discord.Embed(
title=f"**Information sur __{target_copy}__ :**"
f" `{target}`",
color=0x5858d7
)
e.add_field(name="Appartient à :", value=belongs)
e.add_field(name="RIR :", value=f"{ip_info['asn_registry']}")
e.add_field(name="Region :", value=region)
e.add_field(name="Nom de l'hôte :",
value=get_hostname(details, target), inline=False)
e.set_thumbnail(url=flag)
await loading.delete()
await ctx.send(embed=e)

View file

@ -0,0 +1,8 @@
from colorama import init
from .. import __version__, version_info, VersionInfo
from .config import Config
__all__ = ["Config", "__version__", "version_info", "VersionInfo"]
init()

View file

@ -0,0 +1,65 @@
from pathlib import Path
from discord.ext import commands
from . import Config
from . import data_manager
__all__ = ["Tux"]
class Tux(commands.AutoShardedBot):
def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs):
# by default, if the bot shutdown without any intervention,
# it's a crash
self.shutdown_code = 1
self.cli_flags = cli_flags
self.instance_name = self.cli_flags.instance_name
self.last_exception = None
self.config = Config(
data_manager.get_data_path(self.instance_name)
)
self.config.register_global(
token=None,
prefix=[],
owner=None,
whitelist=[],
blacklist=[],
locale="en-US",
embeds=True,
color=0x6E83D1,
disabled_commands=[]
)
self.config.register_guild(
prefix=[],
whitelist=[],
blacklist=[],
locale="en-US",
admin_role=[],
mod_role=[],
embeds=None,
ignored=False,
disabled_commands=[]
)
self.config.register_channel(
ignored=False
)
if "owner_ids" in kwargs:
kwargs["owner_ids"] = set(kwargs["owner_ids"])
else:
kwargs["owner_ids"] = self.config.owner_ids()
message_cache_size = 100_000
kwargs["max_messages"] = message_cache_size
self.max_messages = message_cache_size
self.uptime = None
self.main_dir = bot_dir
print(str(self.cli_flags), self.instance_name, self.config, self.owner_ids, self.main_dir)
exit()
super().__init__(*args, help_command=None, **kwargs)

View file

@ -0,0 +1,41 @@
from pathlib import Path
from typing import Any, NoReturn
class Config:
GLOBAL = "GLOBAL"
GUILD = "GUILD"
CHANNEL = "TEXT_CHANNEL"
ROLE = "ROLE"
MEMBER = "MEMBER"
USER = "USER"
def __init__(self, config_dir: Path):
self._defaults = {}
def __getattr__(self, item: str) -> dict:
return getattr(self._defaults, item)
def _register_default(self, key: str, **kwargs: Any):
...
def register_core(self, **kwargs) -> NoReturn:
self._register_default(self.GUILD, **kwargs)
def register_global(self, **kwargs) -> NoReturn:
self._register_default(self.GLOBAL, **kwargs)
def register_guild(self, **kwargs) -> NoReturn:
self._register_default(self.GUILD, **kwargs)
def register_channel(self, **kwargs) -> NoReturn:
self._register_default(self.CHANNEL, **kwargs)
def register_role(self, **kwargs) -> NoReturn:
self._register_default(self.ROLE, **kwargs)
def register_member(self, **kwargs) -> NoReturn:
self._register_default(self.MEMBER, **kwargs)
def register_user(self, **kwargs) -> NoReturn:
self._register_default(self.USER, **kwargs)

View file

@ -0,0 +1,26 @@
from pathlib import Path
import appdirs
app_dir = appdirs.AppDirs("Tuxbot-bot")
config_dir = Path(app_dir.user_config_dir)
config_file = config_dir / "config.json"
def get_data_path(instance_name: str) -> Path:
return Path(app_dir.user_data_dir) / "data" / instance_name
def get_core_path(instance_name: str) -> Path:
data_path = get_data_path(instance_name)
return data_path / "data" / instance_name / "core"
def get_cogs_path(instance_name: str) -> Path:
data_path = get_data_path(instance_name)
return data_path / "data" / instance_name / "cogs"
def get_cog_path(instance_name: str, cog_name: str) -> Path:
data_path = get_data_path(instance_name)
return data_path / "data" / instance_name / "cogs" / cog_name

View file

@ -0,0 +1,89 @@
import codecs
import itertools
import sys
def bordered(*columns: dict) -> str:
"""
credits to https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/develop/redbot/core/utils/chat_formatting.py
Get two blocks of text in a borders.
Note
----
This will only work with a monospaced font.
Parameters
----------
*columns : `sequence` of `str`
The columns of text, each being a list of lines in that column.
Returns
-------
str
The bordered text.
"""
encoder = codecs.getencoder(sys.stdout.encoding)
try:
encoder("┌┐└┘─│") # border symbols
except UnicodeEncodeError:
ascii_border = True
else:
ascii_border = False
borders = {
"TL": "+" if ascii_border else "", # Top-left
"TR": "+" if ascii_border else "", # Top-right
"BL": "+" if ascii_border else "", # Bottom-left
"BR": "+" if ascii_border else "", # Bottom-right
"HZ": "-" if ascii_border else "", # Horizontal
"VT": "|" if ascii_border else "", # Vertical
}
sep = " " * 4 # Separator between boxes
widths = tuple(
max(
len(row) for row in column.get('rows')
) + 9
for column in columns
) # width of each col
cols_done = [False] * len(columns) # whether or not each column is done
lines = [""]
for i, column in enumerate(columns):
lines[0] += "{TL}" + "{HZ}" + column.get('title') \
+ "{HZ}" * (widths[i] - len(column.get('title')) - 1) \
+ "{TR}" + sep
for line in itertools.zip_longest(
*[column.get('rows') for column in columns]
):
row = []
for colidx, column in enumerate(line):
width = widths[colidx]
done = cols_done[colidx]
if column is None:
if not done:
# bottom border of column
column = "{HZ}" * width
row.append("{BL}" + column + "{BR}")
cols_done[colidx] = True # mark column as done
else:
# leave empty
row.append(" " * (width + 2))
else:
column += " " * (width - len(column)) # append padded spaces
row.append("{VT}" + column + "{VT}")
lines.append(sep.join(row))
final_row = []
for width, done in zip(widths, cols_done):
if not done:
final_row.append("{BL}" + "{HZ}" * width + "{BR}")
else:
final_row.append(" " * (width + 2))
lines.append(sep.join(final_row))
return "\n".join(lines).format(**borders)

View file

@ -0,0 +1,114 @@
import ast
import asyncio
import json
import os
import discord
from discord.ext import commands, flags
from configs.bot.protected import protected
from configs.bot.settings import prefixes
class ContextPlus(commands.Context):
async def send(self, content=None, *args, **kwargs):
if content is not None:
for value in protected:
content = content.replace(
str(value),
'[Deleted]'
)
if kwargs.get('content') is not None:
for value in protected:
kwargs['content'] = kwargs['content'].replace(
str(value),
'[Deleted]'
)
if kwargs.get('embeds') is not None and len(kwargs.get('embeds')) > 0:
for i, embed in enumerate(kwargs.get('embeds')):
embed = str(kwargs.get('embed').to_dict())
for value in protected:
embed = embed.replace(str(value), '[Deleted]')
kwargs['embeds'][i] = discord.Embed.from_dict(
ast.literal_eval(embed)
)
if kwargs.get('embed') is not None:
embed = str(kwargs.get('embed').to_dict())
for value in protected:
embed = embed.replace(str(value), '[Deleted]')
kwargs['embed'] = discord.Embed.from_dict(
ast.literal_eval(embed)
)
if (hasattr(self.command, 'deletable') and self.command.deletable) \
and kwargs.pop('deletable', True):
message = await super().send(content, *args, **kwargs)
await message.add_reaction('🗑')
def check(reaction: discord.Reaction, user: discord.User):
return user == self.author \
and str(reaction.emoji) == '🗑' \
and reaction.message.id == message.id
try:
await self.bot.wait_for(
'reaction_add',
timeout=60.0,
check=check
)
except asyncio.TimeoutError:
await message.remove_reaction('🗑', self.bot.user)
else:
await message.delete()
return message
else:
return await super().send(content, *args, **kwargs)
class CommandPLus(flags.FlagCommand):
def __init__(self, function, **kwargs):
super().__init__(function, **kwargs)
self.deletable = kwargs.pop("deletable", True)
def command_extra(*args, **kwargs):
return commands.command(*args, **kwargs, cls=CommandPLus)
class GroupPlus(flags.FlagGroup):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.deletable = kwargs.pop("deletable", True)
def group_extra(*args, **kwargs):
return commands.group(*args, **kwargs, cls=GroupPlus)
async def get_prefix(bot, message):
custom_prefix = prefixes
if message.guild:
path = f"configs/guilds/{str(message.guild.id)}.json"
if os.path.exists(path):
with open(path) as f:
datas = json.load(f)
custom_prefix = datas["Prefix"]
return commands.when_mentioned_or(*custom_prefix)(bot, message)
def get_owners() -> list:
with open("configs/bot/whitelist.json") as f:
datas = json.load(f)
return datas['owners']
def get_blacklist() -> dict:
with open("configs/bot/blacklist.json") as f:
return json.load(f)

View file

@ -0,0 +1,39 @@
import logging.handlers
import logging
import pathlib
import sys
MAX_OLD_LOGS = 8
MAX_BYTES = 5_000_000
def init_logging(level: int, location: pathlib.Path) -> None:
dpy_logger = logging.getLogger("discord")
dpy_logger.setLevel(logging.WARN)
dpy_logger_file = location / 'discord.log'
base_logger = logging.getLogger("tuxbot")
base_logger.setLevel(level)
base_logger_file = location / 'tuxbot.log'
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}",
datefmt="%Y-%m-%d %H:%M:%S", style="{"
)
dpy_handler = logging.handlers.RotatingFileHandler(
str(dpy_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
base_handler = logging.handlers.RotatingFileHandler(
str(base_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
dpy_logger.addHandler(dpy_handler)
base_logger.addHandler(base_handler)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
base_logger.addHandler(stdout_handler)
dpy_logger.addHandler(stdout_handler)

292
build/lib/tuxbot/setup.py Normal file
View file

@ -0,0 +1,292 @@
import json
import logging
import re
import sys
from pathlib import Path
from typing import NoReturn, Union, List, Set
import click
from colorama import Fore, Style, init
from tuxbot.core.data_manager import config_dir, app_dir
init()
try:
config_dir.mkdir(parents=True, exist_ok=True)
except PermissionError:
print(f"mkdir: cannot create directory '{config_dir}': Permission denied")
sys.exit(1)
config_file = config_dir / "config.json"
def load_existing_config() -> dict:
if not config_file.exists():
return {}
with config_file.open() as fs:
return json.load(fs)
instances_data = load_existing_config()
if not instances_data:
instances_list = []
else:
instances_list = list(instances_data.keys())
def save_config(name, data, delete=False):
_config = load_existing_config()
if delete and name in _config:
_config.pop(name)
else:
_config[name] = data
with config_file.open("w") as fs:
json.dump(_config, fs, indent=4)
def get_name() -> str:
name = ""
while not name:
print(
"What name do you want to give this instance?\n"
"(valid characters: A-Z, a-z, 0-9, _, -)"
)
name = input("> ")
if re.fullmatch(r"[a-zA-Z0-9_\-]*", name) is None:
print()
print(
Fore.RED
+ "ERROR: Invalid characters provided"
+ Style.RESET_ALL
)
name = ""
return name
def get_data_dir(instance_name: str) -> Path:
data_path = Path(app_dir.user_data_dir) / "data" / instance_name
data_path_input = ""
print()
def make_data_dir(path: Path) -> Union[Path, str]:
try:
path.mkdir(parents=True, exist_ok=True)
except OSError:
print()
print(
Fore.RED
+ f"mkdir: cannot create directory '{path}':"
f" Permission denied"
+ Style.RESET_ALL
)
path = ""
return path
while not data_path_input:
print(
"where do you want to save the configurations?\n"
"Press [enter] to keep the default path"
)
print()
print(f"Default: {data_path}")
data_path_input = input("> ")
if data_path_input != '':
data_path_input = Path(data_path_input)
try:
exists = data_path_input.exists()
except OSError:
print()
print(
Fore.RED
+ "Impossible to verify the validity of the path, "
"make sure it does not contain any invalid characters."
+ Style.RESET_ALL
)
data_path_input = ""
exists = False
if data_path_input and not exists:
data_path_input = make_data_dir(data_path_input)
else:
data_path_input = make_data_dir(data_path)
print()
print(
f"You have chosen {data_path_input} to be your config directory for "
f"`{instance_name}` instance"
)
if not click.confirm("Please confirm", default=True):
print("Rerun the process to redo this configuration.")
sys.exit(0)
(data_path_input / 'core').mkdir(parents=True, exist_ok=True)
(data_path_input / 'cogs').mkdir(parents=True, exist_ok=True)
(data_path_input / 'logs').mkdir(parents=True, exist_ok=True)
return data_path_input
def get_token() -> str:
token = ""
while not token:
print(
"Please enter the bot token\n"
"(you can find it at https://discord.com/developers/applications)"
)
token = input("> ")
if re.fullmatch(r"([a-zA-Z0-9]{24}\.[a-zA-Z0-9_]{6}\.[a-zA-Z0-9_\-]{27}|mfa\.[a-zA-Z0-9_\-]{84})", token) is None:
print(
Fore.RED
+ "ERROR: Invalid token provided"
+ Style.RESET_ALL
)
token = ""
return token
def get_multiple(question: str, confirmation: str, value_type: type)\
-> Set[Union[str, int]]:
print(question)
values = [value_type(input('> '))]
while click.confirm(confirmation, default=False):
values.append(value_type(input('> ')))
return set(values)
def additional_config() -> dict:
p = Path(r'tuxbot/cogs').glob('**/additional_config.json')
datas = {}
for file in p:
print()
cog_name = str(file.parent).split('/')[-1]
datas[cog_name] = {}
with file.open('r') as f:
data = json.load(f)
print(f"\n==Configuration for `{cog_name}` module==")
for key, value in data.items():
print()
print(value['description'])
datas[cog_name][key] = input('> ')
return datas
def finish_setup(data_dir: Path) -> NoReturn:
print("Now, it's time to finish this setup by giving bot informations\n")
token = get_token()
print()
prefixes = get_multiple(
"Choice a (or multiple) prefix for the bot",
"Add another prefix ?",
str
)
mentionable = click.confirm(
"Does the bot answer if it's mentioned?",
default=True
)
owners_id = get_multiple(
"Give the owner id of this bot",
"Add another owner ?",
int
)
cogs_config = additional_config()
core_file = data_dir / 'core' / 'settings.json'
core = {
'token': token,
'prefixes': prefixes,
'mentionable': mentionable,
'owners_id': owners_id,
}
with core_file.open("w") as fs:
json.dump(core, fs, indent=4)
for cog, data in cogs_config.items():
data_cog_dir = data_dir / 'cogs' / cog
data_cog_dir.mkdir(parents=True, exist_ok=True)
data_cog_file = data_cog_dir / 'settings.json'
with data_cog_file.open("w") as fs:
json.dump(data, fs, indent=4)
def basic_setup() -> NoReturn:
print("Hi ! it's time for you to give me informations about you instance")
name = get_name()
data_dir = get_data_dir(name)
configs = load_existing_config()
instance_config = configs[name] if name in instances_list else {}
instance_config["DATA_PATH"] = str(data_dir.resolve())
instance_config["IS_RUNNING"] = False
if name in instances_list:
print()
print(
Fore.RED
+ f"WARNING: An instance named `{name}` already exists "
f"Continuing will overwrite this instance configs."
+ Style.RESET_ALL
)
if not click.confirm("Are you sure you want to continue?",
default=False):
print("Abandon...")
sys.exit(0)
save_config(name, instance_config)
print("\n"*4)
finish_setup(data_dir)
print()
print(
f"Instance successfully created! "
f"You can now run `tuxbot {name}` to launch this instance"
)
def setup():
try:
"""Create a new instance."""
level = logging.DEBUG
base_logger = logging.getLogger("tuxbot")
base_logger.setLevel(level)
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}",
datefmt="%Y-%m-%d %H:%M:%S", style="{"
)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
base_logger.addHandler(stdout_handler)
basic_setup()
except KeyboardInterrupt:
print("Exiting...")
if __name__ == "__main__":
setup()

BIN
dist/Tuxbot_bot-3.0.0-py3.8.egg vendored Normal file

Binary file not shown.

View file

@ -12,6 +12,7 @@ python_requires = >=3.7
install_requires = install_requires =
aiohttp==3.6.2 aiohttp==3.6.2
aiosqlite==0.13.0 aiosqlite==0.13.0
appdirs==1.4.4
async-timeout==3.0.1 async-timeout==3.0.1
asyncpg==0.20.1 asyncpg==0.20.1
attrs==19.3.0 attrs==19.3.0
@ -19,6 +20,7 @@ install_requires =
certifi==2020.4.5.1 certifi==2020.4.5.1
chardet==3.0.4 chardet==3.0.4
ciso8601==2.1.3 ciso8601==2.1.3
click==7.1.2
colorama==0.4.3 colorama==0.4.3
discord-flags==2.1.1 discord-flags==2.1.1
discord.py==1.3.3 discord.py==1.3.3

View file

@ -10,5 +10,7 @@ version_info = VersionInfo(
releaselevel='alpha', build=build releaselevel='alpha', build=build
) )
__version__ = "v{}.{}.{}" \ __version__ = "v{}.{}.{}-{}.{}".format(
.format(version_info.major, version_info.minor, version_info.micro) version_info.major, version_info.minor, version_info.micro,
version_info.releaselevel, version_info.build
).replace('\n', '')

View file

@ -1,38 +1,104 @@
import argparse import argparse
import asyncio import asyncio
import getpass
import json
import logging import logging
import platform
import signal import signal
import sys import sys
from typing import NoReturn
import discord import discord
import pip
from colorama import Fore, init, Style from colorama import Fore, init, Style
from pip._vendor import distro
import tuxbot.logging import tuxbot.logging
from tuxbot.core import data_manager from tuxbot.core import data_manager
from tuxbot.core.bot import Tux
from tuxbot.core.utils.functions.cli import bordered
from . import __version__
log = logging.getLogger("tuxbot.main") log = logging.getLogger("tuxbot.main")
init() init()
def list_instances() -> NoReturn:
with data_manager.config_file.open() as fs:
datas = json.load(fs)
instances = list(datas.keys())
info = {
'title': "Instances",
'rows': []
}
for instance in instances:
info['rows'].append(f"-> {instance}")
print(bordered(info))
sys.exit(0)
def debug_info() -> NoReturn:
python_version = sys.version.replace('\n', '')
pip_version = pip.__version__
tuxbot_version = __version__
dpy_version = discord.__version__
os_info = distro.linux_distribution()
os_info = f"{os_info[0]} {os_info[1]}"
runner = getpass.getuser()
info = {
'title': "Debug Info",
'rows': [
f"Tuxbot version: {tuxbot_version}",
"",
f"Python version: {python_version}",
f"Python executable path: {sys.executable}",
f"Pip version: {pip_version}",
f"Discord.py version: {dpy_version}",
"",
f"OS info: {os_info}",
f"System arch: {platform.machine()}",
f"User: {runner}",
]
}
print(bordered(info))
sys.exit(0)
def parse_cli_flags(args): def parse_cli_flags(args):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Tuxbot - OpenSource bot", description="Tuxbot - OpenSource bot",
usage="tuxbot <instance_name> [arguments]" usage="tuxbot <instance_name> [arguments]"
) )
parser.add_argument("--version", "-V", help="Show tuxbot's used version") parser.add_argument(
parser.add_argument("--list-instances", "-L", "--version", "-V",
help="List all instance names") action="store_true",
help="Show tuxbot's used version"
)
parser.add_argument(
"--debug",
action="store_true",
help="Show debug information."
)
parser.add_argument(
"--list-instances", "-L",
action="store_true",
help="List all instance names"
)
parser.add_argument( parser.add_argument(
"instance_name", nargs="?", "instance_name", nargs="?",
help="Name of the bot instance created during `redbot-setup`.") help="Name of the bot instance created during `tuxbot-setup`."
)
args = parser.parse_args(args) args = parser.parse_args(args)
if args.prefix:
args.prefix = sorted(args.prefix, reverse=True)
else:
args.prefix = []
return args return args
@ -42,10 +108,10 @@ async def shutdown_handler(tux, signal_type, exit_code=None):
sys.exit(0) sys.exit(0)
elif exit_code is None: elif exit_code is None:
log.info("Shutting down from unhandled exception") log.info("Shutting down from unhandled exception")
tux._shutdown_mode = 1 tux.shutdown_code = 1
if exit_code is not None: if exit_code is not None:
tux._shutdown_mode = exit_code tux.shutdown_code = exit_code
try: try:
await tux.logout() await tux.logout()
@ -60,8 +126,8 @@ async def shutdown_handler(tux, signal_type, exit_code=None):
await asyncio.gather(*pending, return_exceptions=True) await asyncio.gather(*pending, return_exceptions=True)
async def run_bot(tux: Tuxbot, cli_flags: argparse.Namespace) -> None: async def run_bot(tux: Tux, cli_flags: argparse.Namespace) -> None:
data_path = data_manager.get_data_path(tuxbot.instance_name) data_path = data_manager.get_data_path(tux.instance_name)
tuxbot.logging.init_logging( tuxbot.logging.init_logging(
level=cli_flags.logging_level, level=cli_flags.logging_level,
@ -74,7 +140,7 @@ async def run_bot(tux: Tuxbot, cli_flags: argparse.Namespace) -> None:
if cli_flags.token: if cli_flags.token:
token = cli_flags.token token = cli_flags.token
else: else:
token = await tux._config.token() token = await tux.config.token()
if not token: if not token:
log.critical("Token must be set if you want to login.") log.critical("Token must be set if you want to login.")
@ -92,18 +158,28 @@ async def run_bot(tux: Tuxbot, cli_flags: argparse.Namespace) -> None:
def main(): def main():
tux = None tux = None
cli_flags = parse_cli_flags(sys.argv[1:]) cli_flags = parse_cli_flags(sys.argv[1:])
if cli_flags.list_instances:
list_instances()
elif cli_flags.debug:
debug_info()
elif cli_flags.version:
print("Tuxbot V3")
print(f"Complete Version: {__version__}")
sys.exit(0)
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
if cli_flags.no_instance: if not cli_flags.instance_name:
print(Fore.RED print(Fore.RED
+ "No instance provided ! " + "No instance provided ! "
"You can use 'tuxbot -L' to list all available instances" "You can use 'tuxbot -L' to list all available instances"
+ Style.RESET_ALL) + Style.RESET_ALL)
sys.exit(1) sys.exit(1)
tux = Tuxbot( tux = Tux(
cli_flags=cli_flags, cli_flags=cli_flags,
description="Tuxbot, made from and for OpenSource", description="Tuxbot, made from and for OpenSource",
dm_help=None dm_help=None
@ -113,24 +189,24 @@ def main():
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning("Please use <prefix>quit instead of Ctrl+C to Shutdown!") log.warning("Please use <prefix>quit instead of Ctrl+C to Shutdown!")
log.error("Received KeyboardInterrupt") log.error("Received KeyboardInterrupt")
if tuxbot is not None: if tux is not None:
loop.run_until_complete(shutdown_handler(tux, signal.SIGINT)) loop.run_until_complete(shutdown_handler(tux, signal.SIGINT))
except SystemExit as exc: except SystemExit as exc:
log.info("Shutting down with exit code: %s", exc.code) log.info("Shutting down with exit code: %s", exc.code)
if tuxbot is not None: if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, exc.code)) loop.run_until_complete(shutdown_handler(tux, None, exc.code))
except Exception as exc: except Exception as exc:
log.exception("Unexpected exception (%s): ", type(exc), exc_info=exc) log.exception("Unexpected exception (%s): ", type(exc), exc_info=exc)
if tuxbot is not None: if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, 1)) loop.run_until_complete(shutdown_handler(tux, None, 1))
finally: finally:
loop.run_until_complete(loop.shutdown_asyncgens()) loop.run_until_complete(loop.shutdown_asyncgens())
log.info("Please wait, cleaning up a bit more") log.info("Please wait, cleaning up a bit more")
loop.run_until_complete(asyncio.sleep(2)) loop.run_until_complete(asyncio.sleep(1))
asyncio.set_event_loop(None) asyncio.set_event_loop(None)
loop.stop() loop.stop()
loop.close() loop.close()
exit_code = 1 if tuxbot is None else tux._shutdown_mode exit_code = 1 if tux is None else tux.shutdown_code
sys.exit(exit_code) sys.exit(exit_code)

View file

@ -60,7 +60,7 @@ class Network(commands.Cog, name="Useless"):
ip_info = obj.lookup() ip_info = obj.lookup()
try: try:
handler = ipinfo.getHandler(self.bot._config.ipinfo) handler = ipinfo.getHandler(self.bot.config.ipinfo)
details = handler.getDetails(target) details = handler.getDetails(target)
api_result = True api_result = True
except (RequestQuotaExceededError, HTTPError): except (RequestQuotaExceededError, HTTPError):

View file

@ -2,6 +2,7 @@ from pathlib import Path
from discord.ext import commands from discord.ext import commands
from . import Config from . import Config
from . import data_manager
__all__ = ["Tux"] __all__ = ["Tux"]
@ -11,14 +12,15 @@ class Tux(commands.AutoShardedBot):
def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs): def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs):
# by default, if the bot shutdown without any intervention, # by default, if the bot shutdown without any intervention,
# it's a crash # it's a crash
self._shutdown_mode = 1 self.shutdown_code = 1
self._cli_flags = cli_flags self.cli_flags = cli_flags
self._last_exception = None self.instance_name = self.cli_flags.instance_name
self.last_exception = None
self._config = Config.register_core( self.config = Config(
identifier=self._cli_flags.instance_name data_manager.get_data_path(self.instance_name)
) )
self._config.register_global( self.config.register_global(
token=None, token=None,
prefix=[], prefix=[],
owner=None, owner=None,
@ -29,7 +31,7 @@ class Tux(commands.AutoShardedBot):
color=0x6E83D1, color=0x6E83D1,
disabled_commands=[] disabled_commands=[]
) )
self._config.register_guild( self.config.register_guild(
prefix=[], prefix=[],
whitelist=[], whitelist=[],
blacklist=[], blacklist=[],
@ -40,20 +42,24 @@ class Tux(commands.AutoShardedBot):
ignored=False, ignored=False,
disabled_commands=[] disabled_commands=[]
) )
self._config.register_channel( self.config.register_channel(
ignored=False ignored=False
) )
if "owner_ids" in kwargs: if "owner_ids" in kwargs:
kwargs["owner_ids"] = set(kwargs["owner_ids"]) kwargs["owner_ids"] = set(kwargs["owner_ids"])
else: else:
kwargs["owner_ids"] = self._config.owner_ids() kwargs["owner_ids"] = self.config.owner_ids()
message_cache_size = 100_000 message_cache_size = 100_000
kwargs["max_messages"] = message_cache_size kwargs["max_messages"] = message_cache_size
self._max_messages = message_cache_size self.max_messages = message_cache_size
self._uptime = None self.uptime = None
self._main_dir = bot_dir self.main_dir = bot_dir
print(str(self.cli_flags), self.instance_name, self.config, self.owner_ids, self.main_dir)
exit()
super().__init__(*args, help_command=None, **kwargs) super().__init__(*args, help_command=None, **kwargs)

View file

@ -1,151 +1,39 @@
import logging.handlers import logging.handlers
import logging
import pathlib import pathlib
import re
import sys import sys
from typing import List, Tuple, Optional
MAX_OLD_LOGS = 8 MAX_OLD_LOGS = 8
MAX_BYTES = 5_000_000
class RotatingFileHandler(logging.handlers.RotatingFileHandler):
"""Custom rotating file handler.
This file handler rotates a bit differently to the one in stdlib.
For a start, this works off of a "stem" and a "directory". The stem
is the base name of the log file, without the extension. The
directory is where all log files (including backups) will be placed.
Secondly, this logger rotates files downwards, and new logs are
*started* with the backup number incremented. The stdlib handler
rotates files upwards, and this leaves the logs in reverse order.
Thirdly, naming conventions are not customisable with this class.
Logs will initially be named in the format "{stem}.log", and after
rotating, the first log file will be renamed "{stem}-part1.log",
and a new file "{stem}-part2.log" will be created for logging to
continue.
A few things can't be modified in this handler: it must use append
mode, it doesn't support use of the `delay` arg, and it will ignore
custom namers and rotators.
When this handler is instantiated, it will search through the
directory for logs from previous runtimes, and will open the file
with the highest backup number to append to.
"""
def __init__(
self,
stem: str,
directory: pathlib.Path,
maxBytes: int = 0,
backupCount: int = 0,
encoding: Optional[str] = None,
) -> None:
self.baseStem = stem
self.directory = directory.resolve()
# Scan for existing files in directory, append to last part of existing log
log_part_re = re.compile(rf"{stem}-part(?P<partnum>\d+).log")
highest_part = 0
for path in directory.iterdir():
match = log_part_re.match(path.name)
if match and int(match["partnum"]) > highest_part:
highest_part = int(match["partnum"])
if highest_part:
filename = directory / f"{stem}-part{highest_part}.log"
else:
filename = directory / f"{stem}.log"
super().__init__(
filename,
mode="a",
maxBytes=maxBytes,
backupCount=backupCount,
encoding=encoding,
delay=False,
)
def doRollover(self):
if self.stream:
self.stream.close()
self.stream = None
initial_path = self.directory / f"{self.baseStem}.log"
if self.backupCount > 0 and initial_path.exists():
initial_path.replace(self.directory / f"{self.baseStem}-part1.log")
match = re.match(
rf"{self.baseStem}(?:-part(?P<part>\d+)?)?.log", pathlib.Path(self.baseFilename).name
)
latest_part_num = int(match.groupdict(default="1").get("part", "1"))
if self.backupCount < 1:
# No backups, just delete the existing log and start again
pathlib.Path(self.baseFilename).unlink()
elif latest_part_num > self.backupCount:
# Rotate files down one
# red-part2.log becomes red-part1.log etc, a new log is added at the end.
for i in range(1, self.backupCount):
next_log = self.directory / f"{self.baseStem}-part{i + 1}.log"
if next_log.exists():
prev_log = self.directory / f"{self.baseStem}-part{i}.log"
next_log.replace(prev_log)
else:
# Simply start a new file
self.baseFilename = str(
self.directory / f"{self.baseStem}-part{latest_part_num + 1}.log"
)
self.stream = self._open()
def init_logging(level: int, location: pathlib.Path) -> None: def init_logging(level: int, location: pathlib.Path) -> None:
dpy_logger = logging.getLogger("discord") dpy_logger = logging.getLogger("discord")
dpy_logger.setLevel(logging.WARNING) dpy_logger.setLevel(logging.WARN)
base_logger = logging.getLogger("red") dpy_logger_file = location / 'discord.log'
base_logger = logging.getLogger("tuxbot")
base_logger.setLevel(level) base_logger.setLevel(level)
base_logger_file = location / 'tuxbot.log'
formatter = logging.Formatter( formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}", datefmt="%Y-%m-%d %H:%M:%S", style="{" "[{asctime}] [{levelname}] {name}: {message}",
datefmt="%Y-%m-%d %H:%M:%S", style="{"
) )
dpy_handler = logging.handlers.RotatingFileHandler(
str(dpy_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
base_handler = logging.handlers.RotatingFileHandler(
str(base_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
dpy_logger.addHandler(dpy_handler)
base_logger.addHandler(base_handler)
stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter) stdout_handler.setFormatter(formatter)
base_logger.addHandler(stdout_handler) base_logger.addHandler(stdout_handler)
dpy_logger.addHandler(stdout_handler) dpy_logger.addHandler(stdout_handler)
if not location.exists():
location.mkdir(parents=True, exist_ok=True)
# Rotate latest logs to previous logs
previous_logs: List[pathlib.Path] = []
latest_logs: List[Tuple[pathlib.Path, str]] = []
for path in location.iterdir():
match = re.match(r"latest(?P<part>-part\d+)?\.log", path.name)
if match:
part = match.groupdict(default="")["part"]
latest_logs.append((path, part))
match = re.match(r"previous(?:-part\d+)?.log", path.name)
if match:
previous_logs.append(path)
# Delete all previous.log files
for path in previous_logs:
path.unlink()
# Rename latest.log files to previous.log
for path, part in latest_logs:
path.replace(location / f"previous{part}.log")
latest_fhandler = RotatingFileHandler(
stem="latest",
directory=location,
maxBytes=1_000_000, # About 1MB per logfile
backupCount=MAX_OLD_LOGS,
encoding="utf-8",
)
all_fhandler = RotatingFileHandler(
stem="red",
directory=location,
maxBytes=1_000_000,
backupCount=MAX_OLD_LOGS,
encoding="utf-8",
)
for fhandler in (latest_fhandler, all_fhandler):
fhandler.setFormatter(formatter)
base_logger.addHandler(fhandler)

View file

@ -155,14 +155,14 @@ def get_token() -> str:
def get_multiple(question: str, confirmation: str, value_type: type)\ def get_multiple(question: str, confirmation: str, value_type: type)\
-> Set[Union[str, int]]: -> List[Union[str, int]]:
print(question) print(question)
values = [value_type(input('> '))] values = [value_type(input('> '))]
while click.confirm(confirmation, default=False): while click.confirm(confirmation, default=False):
values.append(value_type(input('> '))) values.append(value_type(input('> ')))
return set(values) return values
def additional_config() -> dict: def additional_config() -> dict: