refactor(all): add black to all code

This commit is contained in:
Romain J 2020-06-06 18:51:47 +02:00
parent cdb891d435
commit 9869312ee8
19 changed files with 428 additions and 391 deletions

View file

@ -1,16 +1,15 @@
import subprocess
from collections import namedtuple
build = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']) \
.decode()
build = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]).decode()
VersionInfo = namedtuple('VersionInfo', 'major minor micro releaselevel build')
version_info = VersionInfo(
major=3, minor=0, micro=0,
releaselevel='alpha', build=build
)
VersionInfo = namedtuple("VersionInfo", "major minor micro releaselevel build")
version_info = VersionInfo(major=3, minor=0, micro=0, releaselevel="alpha", build=build)
__version__ = "v{}.{}.{}-{}.{}".format(
version_info.major, version_info.minor, version_info.micro,
version_info.releaselevel, version_info.build
).replace('\n', '')
version_info.major,
version_info.minor,
version_info.micro,
version_info.releaselevel,
version_info.build,
).replace("\n", "")

View file

@ -33,13 +33,10 @@ def list_instances() -> NoReturn:
instances = list(datas.keys())
info = {
'title': "Instances",
'rows': []
}
info = {"title": "Instances", "rows": []}
for instance in instances:
info['rows'].append(f"-> {instance}")
info["rows"].append(f"-> {instance}")
print(bordered(info))
sys.exit(0)
@ -49,7 +46,7 @@ def debug_info() -> NoReturn:
"""Show debug infos relatives to the bot
"""
python_version = sys.version.replace('\n', '')
python_version = sys.version.replace("\n", "")
pip_version = pip.__version__
tuxbot_version = __version__
dpy_version = discord.__version__
@ -60,8 +57,8 @@ def debug_info() -> NoReturn:
runner = getpass.getuser()
info = {
'title': "Debug Info",
'rows': [
"title": "Debug Info",
"rows": [
f"Tuxbot version: {tuxbot_version}",
"",
f"Python version: {python_version}",
@ -72,7 +69,7 @@ def debug_info() -> NoReturn:
f"OS info: {os_info}",
f"System arch: {platform.machine()}",
f"User: {runner}",
]
],
}
print(bordered(info))
@ -92,31 +89,20 @@ def parse_cli_flags(args: list) -> Namespace:
"""
parser = argparse.ArgumentParser(
description="Tuxbot - OpenSource bot",
usage="tuxbot <instance_name> [arguments]"
usage="tuxbot <instance_name> [arguments]",
)
parser.add_argument(
"--version", "-V",
action="store_true",
help="Show tuxbot's used version"
"--version", "-V", action="store_true", help="Show tuxbot's used version"
)
parser.add_argument("--debug", action="store_true", help="Show debug information.")
parser.add_argument(
"--debug",
action="store_true",
help="Show debug information."
"--list-instances", "-L", action="store_true", help="List all instance names"
)
parser.add_argument("--token", "-T", type=str, help="Run Tuxbot with passed token")
parser.add_argument(
"--list-instances", "-L",
action="store_true",
help="List all instance names"
)
parser.add_argument(
"--token", "-T",
type=str,
help="Run Tuxbot with passed token"
)
parser.add_argument(
"instance_name", nargs="?",
help="Name of the bot instance created during `tuxbot-setup`."
"instance_name",
nargs="?",
help="Name of the bot instance created during `tuxbot-setup`.",
)
args = parser.parse_args(args)
@ -151,9 +137,7 @@ async def shutdown_handler(tux: Tux, signal_type, exit_code=None) -> NoReturn:
try:
await tux.logout()
finally:
pending = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
pending = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in pending:
task.cancel()
@ -178,10 +162,7 @@ async def run_bot(tux: Tux, cli_flags: Namespace) -> None:
"""
data_path = data_manager.data_path(tux.instance_name)
tuxbot.logging.init_logging(
10,
location=data_path / "logs"
)
tuxbot.logging.init_logging(10, location=data_path / "logs")
log.debug("====Basic Config====")
log.debug("Data Path: %s", data_path)
@ -189,7 +170,7 @@ async def run_bot(tux: Tux, cli_flags: Namespace) -> None:
if cli_flags.token:
token = cli_flags.token
else:
token = tux.config('core').get('token')
token = tux.config("core").get("token")
if not token:
log.critical("Token must be set if you want to login.")
@ -226,23 +207,26 @@ def main() -> NoReturn:
try:
if not cli_flags.instance_name:
print(Fore.RED
+ "No instance provided ! "
print(
Fore.RED + "No instance provided ! "
"You can use 'tuxbot -L' to list all available instances"
+ Style.RESET_ALL)
+ Style.RESET_ALL
)
sys.exit(ExitCodes.CRITICAL)
tux = Tux(
cli_flags=cli_flags,
description="Tuxbot, made from and for OpenSource",
dm_help=None
dm_help=None,
)
loop.run_until_complete(run_bot(tux, cli_flags))
except KeyboardInterrupt:
print(Fore.RED
print(
Fore.RED
+ "Please use <prefix>quit instead of Ctrl+C to Shutdown!"
+ Style.RESET_ALL)
+ Style.RESET_ALL
)
log.warning("Please use <prefix>quit instead of Ctrl+C to Shutdown!")
log.error("Received KeyboardInterrupt")
if tux is not None:

View file

@ -1,6 +1,18 @@
from .anti_raid import Warnings
from collections import namedtuple
from .anti_raid import AntiRaid
from ...core.bot import Tux
VersionInfo = namedtuple("VersionInfo", "major minor micro releaselevel")
version_info = VersionInfo(major=1, minor=0, micro=0, releaselevel="alpha")
__version__ = "v{}.{}.{}-{}".format(
version_info.major,
version_info.minor,
version_info.micro,
version_info.releaselevel
).replace("\n", "")
def setup(bot: Tux):
bot.add_cog(Warnings(bot))
bot.add_cog(AntiRaid(bot))

View file

@ -15,8 +15,8 @@ class AntiRaid(commands.Cog, name="AntiRaid"):
self.bot = bot
@commands.group(
name='anti_raid',
alias=['anti-raid', 'raid_protect', 'raid-protect', 'no_raid', 'no-raid']
name="anti_raid",
alias=["anti-raid", "raid_protect", "raid-protect", "no_raid", "no-raid"],
)
@commands.guild_only()
@checks.is_admin()

View file

@ -1,6 +1,18 @@
from collections import namedtuple
from .images import Images
from ...core.bot import Tux
VersionInfo = namedtuple("VersionInfo", "major minor micro releaselevel")
version_info = VersionInfo(major=1, minor=0, micro=0, releaselevel="alpha")
__version__ = "v{}.{}.{}-{}".format(
version_info.major,
version_info.minor,
version_info.micro,
version_info.releaselevel
).replace("\n", "")
def setup(bot: Tux):
bot.add_cog(Images(bot))

View file

@ -28,26 +28,34 @@ class Images(commands.Cog, name="Images"):
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
await ctx.send(file=discord.File(data, "output.png"))
@command_extra(name="phcomment")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _phcomment(self, ctx: ContextPlus, user: discord.User = None, *, message: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
async def _phcomment(
self,
ctx: ContextPlus,
user: discord.User = None,
*,
message: commands.clean_content(
fix_channel_mentions=True, escape_markdown=True
),
):
async with ctx.typing():
message = message.replace("&", "%26")
if user is None:
avatar = ctx.author.avatar_url_as(format='png')
avatar = ctx.author.avatar_url_as(format="png")
username = ctx.author.name
else:
avatar = user.avatar_url_as(format='png')
avatar = user.avatar_url_as(format="png")
username = user.name
url = f"{self.image_api}/ph/comment" \
f"?image={avatar}" \
f"&username={username}" \
url = (
f"{self.image_api}/ph/comment"
f"?image={avatar}"
f"&username={username}"
f"&message={message}"
)
async with self.bot.session.get(url) as r:
if r.status != 200:
@ -55,18 +63,25 @@ class Images(commands.Cog, name="Images"):
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
await ctx.send(file=discord.File(data, "output.png"))
@command_extra(name="phvideo")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _phvideo(self, ctx: ContextPlus, image: str, author: discord.User, *, title: commands.clean_content(fix_channel_mentions=True, escape_markdown=True)):
async def _phvideo(
self,
ctx: ContextPlus,
image: str,
author: discord.User,
*,
title: commands.clean_content(fix_channel_mentions=True, escape_markdown=True),
):
async with ctx.typing():
url = f"{self.image_api}/ph/video" \
f"?image={image}" \
f"&username={author.name}" \
url = (
f"{self.image_api}/ph/video"
f"?image={image}"
f"&username={author.name}"
f"&title={title}"
)
async with self.bot.session.get(url) as r:
if r.status != 200:
@ -74,9 +89,7 @@ class Images(commands.Cog, name="Images"):
data = BytesIO(await r.read())
await ctx.send(
file=discord.File(data, "output.png")
)
await ctx.send(file=discord.File(data, "output.png"))
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@ -88,7 +101,7 @@ class Images(commands.Cog, name="Images"):
passed_flags["text4"] = passed_flags.get("text1")
passed_flags["text5"] = passed_flags.get("text2")
await self._send_meme(ctx, 'balloon', **passed_flags)
await self._send_meme(ctx, "balloon", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@ -96,48 +109,48 @@ class Images(commands.Cog, name="Images"):
@command_extra(name="butterfly")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _butterfly(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'butterfly', **passed_flags)
await self._send_meme(ctx, "butterfly", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="buttons")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _buttons(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'buttons', **passed_flags)
await self._send_meme(ctx, "buttons", **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="cmm")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _cmm(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'change_my_mind', **passed_flags)
await self._send_meme(ctx, "change_my_mind", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="drake")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _drake(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'drake', **passed_flags)
await self._send_meme(ctx, "drake", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="fry")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _fry(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'fry', **passed_flags)
await self._send_meme(ctx, "fry", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="imagination")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _imagination(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'imagination', **passed_flags)
await self._send_meme(ctx, "imagination", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str, default=False)
@command_extra(name="everywhere")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _everywhere(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'everywhere', **passed_flags)
await self._send_meme(ctx, "everywhere", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@ -145,13 +158,13 @@ class Images(commands.Cog, name="Images"):
@command_extra(name="choice")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _choice(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'choice', **passed_flags)
await self._send_meme(ctx, "choice", **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="pika")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _pika(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'pika', **passed_flags)
await self._send_meme(ctx, "pika", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@ -159,17 +172,17 @@ class Images(commands.Cog, name="Images"):
@command_extra(name="pkp")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _pkp(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'pkp', **passed_flags)
await self._send_meme(ctx, "pkp", **passed_flags)
@flags.add_flag("--text1", type=str)
@flags.add_flag("--text2", type=str)
@command_extra(name="puppet")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _puppet(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'puppet', **passed_flags)
await self._send_meme(ctx, "puppet", **passed_flags)
@flags.add_flag("--text1", type=str)
@command_extra(name="scroll_of_truth", alias=['sot'])
@command_extra(name="scroll_of_truth", alias=["sot"])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _sot(self, ctx: ContextPlus, **passed_flags):
await self._send_meme(ctx, 'scroll_of_truth', **passed_flags)
await self._send_meme(ctx, "scroll_of_truth", **passed_flags)

View file

@ -1,10 +1,21 @@
import logging
from collections import namedtuple
from discord.ext import commands
from .logs import Logs, GatewayHandler, on_error
from ...core.bot import Tux
VersionInfo = namedtuple("VersionInfo", "major minor micro releaselevel")
version_info = VersionInfo(major=2, minor=0, micro=0, releaselevel="alpha")
__version__ = "v{}.{}.{}-{}".format(
version_info.major,
version_info.minor,
version_info.micro,
version_info.releaselevel
).replace("\n", "")
def setup(bot: Tux):
cog = Logs(bot)

View file

@ -30,16 +30,17 @@ class GatewayHandler(logging.Handler):
super().__init__(logging.INFO)
def filter(self, record):
return record.name == 'discord.gateway' \
or 'Shard ID' in record.msg \
or 'Websocket closed ' in record.msg
return (
record.name == "discord.gateway"
or "Shard ID" in record.msg
or "Websocket closed " in record.msg
)
def emit(self, record):
self.cog.add_record(record)
class Logs(commands.Cog):
def __init__(self, bot: TuxBot):
self.bot = bot
self.process = psutil.Process()
@ -54,15 +55,13 @@ class Logs(commands.Cog):
def _clear_gateway_data(self):
one_week_ago = datetime.datetime.utcnow() - datetime.timedelta(days=7)
to_remove = [
index for index, dt in enumerate(self._resumes)
if dt < one_week_ago
index for index, dt in enumerate(self._resumes) if dt < one_week_ago
]
for index in reversed(to_remove):
del self._resumes[index]
for shard_id, dates in self._identifies.items():
to_remove = [index for index, dt in enumerate(dates) if
dt < one_week_ago]
to_remove = [index for index, dt in enumerate(dates) if dt < one_week_ago]
for index in reversed(to_remove):
del dates[index]
@ -79,25 +78,28 @@ class Logs(commands.Cog):
self.bot.command_stats[command] += 1
message = ctx.message
if ctx.guild is None:
destination = 'Private Message'
destination = "Private Message"
guild_id = None
else:
destination = f'#{message.channel} ({message.guild})'
destination = f"#{message.channel} ({message.guild})"
guild_id = ctx.guild.id
log.info(
f'{message.created_at}: {message.author} '
f'in {destination}: {message.content}')
f"{message.created_at}: {message.author} "
f"in {destination}: {message.content}"
)
async with self._batch_lock:
self._data_batch.append({
'guild': guild_id,
'channel': ctx.channel.id,
'author': ctx.author.id,
'used': message.created_at.isoformat(),
'prefix': ctx.prefix,
'command': command,
'failed': ctx.command_failed,
})
self._data_batch.append(
{
"guild": guild_id,
"channel": ctx.channel.id,
"author": ctx.author.id,
"used": message.created_at.isoformat(),
"prefix": ctx.prefix,
"command": command,
"failed": ctx.command_failed,
}
)
@commands.Cog.listener()
async def on_command_completion(self, ctx):
@ -105,7 +107,7 @@ class Logs(commands.Cog):
@commands.Cog.listener()
async def on_socket_response(self, msg):
self.bot.socket_stats[msg.get('t')] += 1
self.bot.socket_stats[msg.get("t")] += 1
@property
def logs(self):
@ -113,48 +115,44 @@ class Logs(commands.Cog):
for key, value in self.bot.logs_channels.items():
webhooks[key] = discord.Webhook.partial(
id=value.get('webhook')['id'],
token=value.get('webhook')['token'],
adapter=discord.AsyncWebhookAdapter(
self.bot.session
)
id=value.get("webhook")["id"],
token=value.get("webhook")["token"],
adapter=discord.AsyncWebhookAdapter(self.bot.session),
)
return webhooks
async def log_error(self, *, ctx=None, extra=None):
e = discord.Embed(title='Error', colour=0xdd5f53)
e.description = f'```py\n{traceback.format_exc()}\n```'
e.add_field(name='Extra', value=extra, inline=False)
e = discord.Embed(title="Error", colour=0xDD5F53)
e.description = f"```py\n{traceback.format_exc()}\n```"
e.add_field(name="Extra", value=extra, inline=False)
e.timestamp = datetime.datetime.utcnow()
if ctx is not None:
fmt = '{0} (ID: {0.id})'
fmt = "{0} (ID: {0.id})"
author = fmt.format(ctx.author)
channel = fmt.format(ctx.channel)
guild = 'None' if ctx.guild is None else fmt.format(ctx.guild)
guild = "None" if ctx.guild is None else fmt.format(ctx.guild)
e.add_field(name='Author', value=author)
e.add_field(name='Channel', value=channel)
e.add_field(name='Guild', value=guild)
e.add_field(name="Author", value=author)
e.add_field(name="Channel", value=channel)
e.add_field(name="Guild", value=guild)
await self.logs.get('errors').send(embed=e)
await self.logs.get("errors").send(embed=e)
async def send_guild_stats(self, e, guild):
e.add_field(name='Name', value=guild.name)
e.add_field(name='ID', value=guild.id)
e.add_field(name='Shard ID', value=guild.shard_id or 'N/A')
e.add_field(name='Owner',
value=f'{guild.owner} (ID: {guild.owner.id})')
e.add_field(name="Name", value=guild.name)
e.add_field(name="ID", value=guild.id)
e.add_field(name="Shard ID", value=guild.shard_id or "N/A")
e.add_field(name="Owner", value=f"{guild.owner} (ID: {guild.owner.id})")
bots = sum(member.bot for member in guild.members)
total = guild.member_count
online = sum(member.status is discord.Status.online
for member in guild.members)
online = sum(member.status is discord.Status.online for member in guild.members)
e.add_field(name='Members', value=str(total))
e.add_field(name='Bots', value=f'{bots} ({bots / total:.2%})')
e.add_field(name='Online', value=f'{online} ({online / total:.2%})')
e.add_field(name="Members", value=str(total))
e.add_field(name="Bots", value=f"{bots} ({bots / total:.2%})")
e.add_field(name="Online", value=f"{online} ({online / total:.2%})")
if guild.icon:
e.set_thumbnail(url=guild.icon_url)
@ -162,16 +160,16 @@ class Logs(commands.Cog):
if guild.me:
e.timestamp = guild.me.joined_at
await self.logs.get('guilds').send(embed=e)
await self.logs.get("guilds").send(embed=e)
@commands.Cog.listener()
async def on_guild_join(self, guild: discord.guild):
e = discord.Embed(colour=0x53dda4, title='New Guild') # green colour
e = discord.Embed(colour=0x53DDA4, title="New Guild") # green colour
await self.send_guild_stats(e, guild)
@commands.Cog.listener()
async def on_guild_remove(self, guild: discord.guild):
e = discord.Embed(colour=0xdd5f53, title='Left Guild') # red colour
e = discord.Embed(colour=0xDD5F53, title="Left Guild") # red colour
await self.send_guild_stats(e, guild)
@commands.Cog.listener()
@ -185,61 +183,58 @@ class Logs(commands.Cog):
e = discord.Embed(
title=f"DM to: {message.channel.recipient}",
description=message.content,
color=0x39e326
color=0x39E326,
)
else:
e = discord.Embed(
title="New DM:",
description=message.content,
color=0x0A97F5
title="New DM:", description=message.content, color=0x0A97F5
)
e.set_author(
name=message.channel.recipient,
icon_url=message.channel.recipient.avatar_url_as(format="png")
icon_url=message.channel.recipient.avatar_url_as(format="png"),
)
if message.attachments:
attachment_url = message.attachments[0].url
e.set_image(url=attachment_url)
e.set_footer(
text=f"User ID: {message.channel.recipient.id}"
)
e.set_footer(text=f"User ID: {message.channel.recipient.id}")
await self.logs["dm"].send(embed=e)
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
await self.register_command(ctx)
if not isinstance(error, (
commands.CommandInvokeError, commands.ConversionError)):
if not isinstance(
error, (commands.CommandInvokeError, commands.ConversionError)
):
return
error = error.original
if isinstance(error, (discord.Forbidden, discord.NotFound)):
return
e = discord.Embed(title='Command Error', colour=0xcc3366)
e.add_field(name='Name', value=ctx.command.qualified_name)
e.add_field(name='Author', value=f'{ctx.author} (ID: {ctx.author.id})')
e = discord.Embed(title="Command Error", colour=0xCC3366)
e.add_field(name="Name", value=ctx.command.qualified_name)
e.add_field(name="Author", value=f"{ctx.author} (ID: {ctx.author.id})")
fmt = f'Channel: {ctx.channel} (ID: {ctx.channel.id})'
fmt = f"Channel: {ctx.channel} (ID: {ctx.channel.id})"
if ctx.guild:
fmt = f'{fmt}\nGuild: {ctx.guild} (ID: {ctx.guild.id})'
fmt = f"{fmt}\nGuild: {ctx.guild} (ID: {ctx.guild.id})"
e.add_field(name='Location', value=fmt, inline=False)
e.add_field(name='Content', value=textwrap.shorten(
ctx.message.content,
width=512
))
exc = ''.join(traceback.format_exception(
type(error), error, error.__traceback__,
chain=False)
e.add_field(name="Location", value=fmt, inline=False)
e.add_field(
name="Content", value=textwrap.shorten(ctx.message.content, width=512)
)
e.description = f'```py\n{exc}\n```'
exc = "".join(
traceback.format_exception(
type(error), error, error.__traceback__, chain=False
)
)
e.description = f"```py\n{exc}\n```"
e.timestamp = datetime.datetime.utcnow()
await self.logs.get('errors').send(embed=e)
await self.logs.get("errors").send(embed=e)
@commands.Cog.listener()
async def on_socket_raw_send(self, data):
@ -247,9 +242,9 @@ class Logs(commands.Cog):
return
back_to_json = json.loads(data)
if back_to_json['op'] == 2:
payload = back_to_json['d']
inner_shard = payload.get('shard', [0])
if back_to_json["op"] == 2:
payload = back_to_json["d"]
inner_shard = payload.get("shard", [0])
self._identifies[inner_shard[0]].append(datetime.datetime.utcnow())
else:
self._resumes.append(datetime.datetime.utcnow())
@ -260,17 +255,14 @@ class Logs(commands.Cog):
self._gateway_queue.put_nowait(record)
async def notify_gateway_status(self, record):
types = {
'INFO': ':information_source:',
'WARNING': ':warning:'
}
types = {"INFO": ":information_source:", "WARNING": ":warning:"}
emoji = types.get(record.levelname, ':heavy_multiplication_x:')
emoji = types.get(record.levelname, ":heavy_multiplication_x:")
dt = datetime.datetime.utcfromtimestamp(record.created)
msg = f'{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {record.message}`'
await self.logs.get('gateway').send(msg)
msg = f"{emoji} `[{dt:%Y-%m-%d %H:%M:%S}] {record.message}`"
await self.logs.get("gateway").send(msg)
@command_extra(name='commandstats')
@command_extra(name="commandstats")
@commands.is_owner()
async def _commandstats(self, ctx, limit=20):
counter = self.bot.command_stats
@ -281,11 +273,11 @@ class Logs(commands.Cog):
else:
common = counter.most_common()[limit:]
output = '\n'.join(f'{k:<{width}}: {c}' for k, c in common)
output = "\n".join(f"{k:<{width}}: {c}" for k, c in common)
await ctx.send(f'```\n{output}\n```')
await ctx.send(f"```\n{output}\n```")
@commands.command('socketstats')
@commands.command("socketstats")
@commands.is_owner()
async def _socketstats(self, ctx):
delta = datetime.datetime.utcnow() - self.bot.uptime
@ -293,31 +285,35 @@ class Logs(commands.Cog):
total = sum(self.bot.socket_stats.values())
cpm = total / minutes
await ctx.send(
f'{total} socket events observed ({cpm:.2f}/minute):\n'
f'{self.bot.socket_stats}')
f"{total} socket events observed ({cpm:.2f}/minute):\n"
f"{self.bot.socket_stats}"
)
@commands.command('uptime')
@commands.command("uptime")
async def _uptime(self, ctx):
uptime = humanize.naturaltime(
datetime.datetime.utcnow() - self.bot.uptime)
await ctx.send(f'Uptime: **{uptime}**')
uptime = humanize.naturaltime(datetime.datetime.utcnow() - self.bot.uptime)
await ctx.send(f"Uptime: **{uptime}**")
async def on_error(self, event, *args):
e = discord.Embed(title='Event Error', colour=0xa32952)
e.add_field(name='Event', value=event)
e.description = f'```py\n{traceback.format_exc()}\n```'
e = discord.Embed(title="Event Error", colour=0xA32952)
e.add_field(name="Event", value=event)
e.description = f"```py\n{traceback.format_exc()}\n```"
e.timestamp = datetime.datetime.utcnow()
args_str = ['```py']
args_str = ["```py"]
for index, arg in enumerate(args):
args_str.append(f'[{index}]: {arg!r}')
args_str.append('```')
e.add_field(name='Args', value='\n'.join(args_str), inline=False)
args_str.append(f"[{index}]: {arg!r}")
args_str.append("```")
e.add_field(name="Args", value="\n".join(args_str), inline=False)
hook = self.get_cog('Logs').logs.get('errors')
hook = self.get_cog("Logs").logs.get("errors")
try:
await hook.send(embed=e)
except (discord.HTTPException, discord.NotFound,
discord.Forbidden, discord.InvalidArgument):
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
discord.InvalidArgument,
):
pass

View file

@ -1,6 +1,18 @@
from collections import namedtuple
from .network import Network
from ...core.bot import Tux
VersionInfo = namedtuple("VersionInfo", "major minor micro releaselevel")
version_info = VersionInfo(major=2, minor=0, micro=0, releaselevel="alpha")
__version__ = "v{}.{}.{}-{}".format(
version_info.major,
version_info.minor,
version_info.micro,
version_info.releaselevel
).replace("\n", "")
def setup(bot: Tux):
bot.add_cog(Network(bot))

View file

@ -19,14 +19,13 @@ class Network(commands.Cog, name="Useless"):
def __init__(self, bot: TuxBot):
self.bot = bot
@flags.add_flag("-i", "--ip", type=str, default='v4',
choices=['v4', '4', 'v6', '6'])
@command_extra(name="iplocalise", aliases=['localiseip'])
@flags.add_flag(
"-i", "--ip", type=str, default="v4", choices=["v4", "4", "v6", "6"]
)
@command_extra(name="iplocalise", aliases=["localiseip"])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _iplocalise(self, ctx: ContextPlus, target: str, **passed_flags):
loading = await ctx.send(
"_Récupération des informations..._", deletable=False
)
loading = await ctx.send("_Récupération des informations..._", deletable=False)
def get_hostname(dtl, tgt):
try:
@ -35,25 +34,29 @@ class Network(commands.Cog, name="Useless"):
try:
return socket.gethostbyaddr(tgt)[0]
except (ValueError, socket.herror):
return 'N/A'
return "N/A"
ip_type = passed_flags.get('ip')
ip_type = passed_flags.get("ip")
target_copy = target
# clean https://, last /, ...
spltTgt = target.split("://")
target = spltTgt[
(0, 1)[len(spltTgt) > 1]
].split("?")[0].split('/')[0].split(':')[0].lower()
target = (
spltTgt[(0, 1)[len(spltTgt) > 1]]
.split("?")[0]
.split("/")[0]
.split(":")[0]
.lower()
)
try:
target = socket.getaddrinfo(
target, None,
socket.AF_INET if ip_type in ['v4', '4'] else socket.AF_INET6
target,
None,
socket.AF_INET if ip_type in ["v4", "4"] else socket.AF_INET6,
)[1][4][0]
except socket.gaierror:
return \
await ctx.send("Erreur, cette adresse n'est pas disponible.")
return await ctx.send("Erreur, cette adresse n'est pas disponible.")
net = Net(target)
obj = IPASN(net)
@ -70,34 +73,38 @@ class Network(commands.Cog, name="Useless"):
if api_result:
belongs = f"{details.org}"
osm = f"https://www.openstreetmap.org/" \
f"?mlat={details.latitude}" \
f"&mlon={details.longitude}" \
f"#map=5/{details.latitude}/{details.longitude}" \
osm = (
f"https://www.openstreetmap.org/"
f"?mlat={details.latitude}"
f"&mlon={details.longitude}"
f"#map=5/{details.latitude}/{details.longitude}"
f"&layers=H"
)
region = f"[{details.city} - {details.region} " \
f"({details.country})]({osm})"
flag = f"https://www.countryflags.io/" \
f"{details.country}/shiny/64.png"
region = (
f"[{details.city} - {details.region} " f"({details.country})]({osm})"
)
flag = f"https://www.countryflags.io/" f"{details.country}/shiny/64.png"
else:
belongs = f"{ip_info['asn_description']} (AS{ip_info['asn']})"
region = f"{ip_info['asn_country_code']}"
flag = f"https://www.countryflags.io/" \
flag = (
f"https://www.countryflags.io/"
f"{ip_info['asn_country_code']}/shiny/64.png"
)
e = discord.Embed(
title=f"**Information sur __{target_copy}__ :**"
f" `{target}`",
color=0x5858d7
title=f"**Information sur __{target_copy}__ :**" f" `{target}`",
color=0x5858D7,
)
e.add_field(name="Appartient à :", value=belongs)
e.add_field(name="RIR :", value=f"{ip_info['asn_registry']}")
e.add_field(name="Region :", value=region)
e.add_field(name="Nom de l'hôte :",
value=get_hostname(details, target), inline=False)
e.add_field(
name="Nom de l'hôte :", value=get_hostname(details, target), inline=False
)
e.set_thumbnail(url=flag)

View file

@ -1,6 +1,18 @@
from collections import namedtuple
from .warnings import Warnings
from ...core.bot import Tux
VersionInfo = namedtuple("VersionInfo", "major minor micro releaselevel")
version_info = VersionInfo(major=1, minor=0, micro=0, releaselevel="alpha")
__version__ = "v{}.{}.{}-{}".format(
version_info.major,
version_info.minor,
version_info.micro,
version_info.releaselevel
).replace("\n", "")
def setup(bot: Tux):
bot.add_cog(Warnings(bot))

View file

@ -11,7 +11,7 @@ class Warnings(commands.Cog, name="Warnings"):
def __init__(self, bot: Tux):
self.bot = bot
@commands.group(name='warn', alias=['warning'])
@commands.group(name="warn", alias=["warning"])
@commands.guild_only()
@checks.is_mod()
async def _warn(self, ctx: commands.Context):
@ -23,25 +23,18 @@ class Warnings(commands.Cog, name="Warnings"):
self,
ctx: commands.Context,
member: Union[discord.User, discord.Member],
reason: str
reason: str,
):
pass
@_warn.command(name="delete", aliases=["del", "remove"])
@commands.guild_only()
async def action_del(
self,
ctx: commands.Context,
warn_id: int,
reason: str = ""
):
async def action_del(self, ctx: commands.Context, warn_id: int, reason: str = ""):
pass
@_warn.command(name="list", aliases=["all"])
@commands.guild_only()
async def action_del(
self,
ctx: commands.Context,
member: Union[discord.User, discord.Member] = None
self, ctx: commands.Context, member: Union[discord.User, discord.Member] = None
):
pass

View file

@ -26,10 +26,7 @@ NAME = r"""
|_| \__,_/_/\_\_.__/ \___/ \__| |_.__/ \___/ \__|
"""
packages: List[str] = [
"jishaku",
"tuxbot.cogs.warnings"
]
packages: List[str] = ["jishaku", "tuxbot.cogs.warnings"]
class Tux(commands.AutoShardedBot):
@ -47,11 +44,11 @@ class Tux(commands.AutoShardedBot):
self.config = Config(self.instance_name)
async def _prefixes(bot, message) -> List[str]:
prefixes = self.config('core').get('prefixes')
prefixes = self.config("core").get("prefixes")
prefixes.extend(self.config.get_prefixes(message.guild))
if self.config('core').get('mentionable'):
if self.config("core").get("mentionable"):
return commands.when_mentioned_or(*prefixes)(bot, message)
return prefixes
@ -79,41 +76,37 @@ class Tux(commands.AutoShardedBot):
try:
self.load_extension(package)
except Exception as e:
print(Fore.RED
print(
Fore.RED
+ f"Failed to load package {package}"
+ Style.RESET_ALL
+ f" check "
f"{str((self.logs / 'tuxbot.log').resolve())} "
f"for more details")
log.exception(
f"Failed to load package {package}",
exc_info=e
f"for more details"
)
log.exception(f"Failed to load package {package}", exc_info=e)
async def on_ready(self):
self.uptime = datetime.datetime.now()
INFO = {
'title': "INFO",
'rows': [
"title": "INFO",
"rows": [
str(self.user),
f"Prefixes: {', '.join(self.config('core').get('prefixes'))}",
f"Language: {self.config('core').get('locale')}",
f"Tuxbot Version: {__version__}",
f"Discord.py Version: {discord.__version__}",
"Python Version: " + sys.version.replace('\n', ''),
"Python Version: " + sys.version.replace("\n", ""),
f"Shards: {self.shard_count}",
f"Servers: {len(self.guilds)}",
f"Users: {len(self.users)}"
]
f"Users: {len(self.users)}",
],
}
COGS = {
'title': "COGS",
'rows': []
}
COGS = {"title": "COGS", "rows": []}
for extension in packages:
COGS['rows'].append(
COGS["rows"].append(
f"[{'X' if extension in self.extensions else ' '}] {extension}"
)
@ -142,7 +135,7 @@ class Tux(commands.AutoShardedBot):
app = await self.application_info()
if app.team:
ids = [m.id for m in app.team.members]
self.config.update('core', 'owners_id', ids)
await self.config.update("core", "owners_id", ids)
owner = user.id in ids
self._app_owners_fetched = True
@ -158,9 +151,11 @@ class Tux(commands.AutoShardedBot):
if message.author.bot:
return
if message.guild.id in self.config.get_blacklist('guild') \
or message.channel.id in self.config.get_blacklist('channel') \
or message.author.id in self.config.get_blacklist('user'):
if (
message.guild.id in self.config.get_blacklist("guild")
or message.channel.id in self.config.get_blacklist("channel")
or message.author.id in self.config.get_blacklist("user")
):
return
ctx = await self.get_context(message)

View file

@ -25,6 +25,7 @@ def is_mod():
"""Is the user a moderator ?
"""
async def pred(ctx):
if await ctx.bot.is_owner(ctx.author):
return True
@ -38,6 +39,7 @@ def is_admin():
"""Is the user admin ?
"""
async def pred(ctx):
if await ctx.bot.is_owner(ctx.author):
return True
@ -64,9 +66,7 @@ async def check_permissions(ctx: "ContextPlus", **perms: Dict[str, bool]):
return False
resolved = ctx.channel.permissions_for(ctx.author)
return all(
getattr(resolved, name, None) == value for name, value in perms.items()
)
return all(getattr(resolved, name, None) == value for name, value in perms.items())
def guild_owner_or_permissions(**perms: Dict[str, bool]):
@ -77,6 +77,7 @@ def guild_owner_or_permissions(**perms: Dict[str, bool]):
**perms:dict
Perms to verify.
"""
async def pred(ctx):
if ctx.author is ctx.guild.owner:
return True

View file

@ -1,3 +1,4 @@
import asyncio
import json
import logging
from typing import List, Dict, Union, Any
@ -12,27 +13,31 @@ log = logging.getLogger("tuxbot.core.config")
class Config:
def __init__(
self,
cog_instance: str = None
):
def __init__(self, cog_instance: str = None):
self._cog_instance = cog_instance
self.lock = asyncio.Lock()
self.loop = asyncio.get_event_loop()
self._settings_file = None
self._datas = {}
def __getitem__(self, item) -> Dict:
path = data_path(self._cog_instance)
if item != 'core':
path = path / 'cogs' / item
if item != "core":
path = path / "cogs" / item
else:
path /= 'core'
path /= "core"
settings_file = path / 'settings.json'
settings_file = path / "settings.json"
if not settings_file.exists():
raise FileNotFoundError(f"Unable to find settings file "
f"'{settings_file}'")
raise FileNotFoundError(
f"Unable to find settings file " f"'{settings_file}'"
)
else:
with settings_file.open('r') as f:
with settings_file.open("r") as f:
return json.load(f)
def __call__(self, item):
@ -46,7 +51,7 @@ class Config:
str
Owners id.
"""
return self.__getitem__('core').get('owners_id')
return self.__getitem__("core").get("owners_id")
def token(self) -> str:
"""Simply return the bot token saved in config file.
@ -56,7 +61,7 @@ class Config:
str
Bot token.
"""
return self.__getitem__('core').get('token')
return self.__getitem__("core").get("token")
def get_prefixes(self, guild: discord.Guild) -> List[str]:
"""Get custom prefixes for one guild.
@ -71,11 +76,8 @@ class Config:
List[str]
List of all prefixes.
"""
core = self.__getitem__('core')
prefixes = core \
.get('guild', {}) \
.get(guild.id, {}) \
.get('prefixes', [])
core = self.__getitem__("core")
prefixes = core.get("guild", {}).get(guild.id, {}).get("prefixes", [])
return prefixes
@ -92,14 +94,16 @@ class Config:
List[Union[str, int]]
List containing blacklisted values.
"""
core = self.__getitem__('core')
blacklist = core \
.get('blacklist', {}) \
.get(key, [])
core = self.__getitem__("core")
blacklist = core.get("blacklist", {}).get(key, [])
return blacklist
def update(self, cog_name: str, item: str, value: Any) -> dict:
def _dump(self):
with self._settings_file.open("w") as f:
json.dump(self._datas, f, indent=4)
async def update(self, cog_name: str, item: str, value: Any) -> dict:
"""Update values in config file.
Parameters
@ -122,14 +126,16 @@ class Config:
datas[item] = value
if cog_name != 'core':
path = path / 'cogs' / cog_name
self._datas = datas
if cog_name != "core":
path = path / "cogs" / cog_name
else:
path /= 'core'
path /= "core"
settings_file = path / 'settings.json'
self._settings_file = path / "settings.json"
with settings_file.open('w') as f:
json.dump(datas, f, indent=4)
async with self.lock:
await self.loop.run_in_executor(None, self._dump)
return datas

View file

@ -43,22 +43,22 @@ def bordered(*columns: dict) -> str:
sep = " " * 4 # Separator between boxes
widths = tuple(
max(
len(row) for row in column.get('rows')
) + 9
for column in columns
max(len(row) for row in column.get("rows")) + 9 for column in columns
) # width of each col
cols_done = [False] * len(columns) # whether or not each column is done
lines = [""]
for i, column in enumerate(columns):
lines[0] += "{TL}" + "{HZ}" + column.get('title') \
+ "{HZ}" * (widths[i] - len(column.get('title')) - 1) \
+ "{TR}" + sep
lines[0] += (
"{TL}"
+ "{HZ}"
+ column.get("title")
+ "{HZ}" * (widths[i] - len(column.get("title")) - 1)
+ "{TR}"
+ sep
)
for line in itertools.zip_longest(
*[column.get('rows') for column in columns]
):
for line in itertools.zip_longest(*[column.get("rows") for column in columns]):
row = []
for colidx, column in enumerate(line):
width = widths[colidx]

View file

@ -6,25 +6,23 @@ from discord.ext import commands, flags
class ContextPlus(commands.Context):
async def send(self, content=None, *args, **kwargs):
if (hasattr(self.command, 'deletable')
and self.command.deletable) \
and kwargs.pop('deletable', True):
if (
hasattr(self.command, "deletable") and self.command.deletable
) and kwargs.pop("deletable", True):
message = await super().send(content, *args, **kwargs)
await message.add_reaction('🗑')
await message.add_reaction("🗑")
def check(reaction: discord.Reaction, user: discord.User):
return user == self.author \
and str(reaction.emoji) == '🗑' \
return (
user == self.author
and str(reaction.emoji) == "🗑"
and reaction.message.id == message.id
)
try:
await self.bot.wait_for(
'reaction_add',
timeout=60.0,
check=check
)
await self.bot.wait_for("reaction_add", timeout=60.0, check=check)
except asyncio.TimeoutError:
await message.remove_reaction('🗑', self.bot.user)
await message.remove_reaction("🗑", self.bot.user)
else:
await message.delete()
return message

View file

@ -20,24 +20,23 @@ def init_logging(level: int, location: pathlib.Path) -> None:
dpy_logger = logging.getLogger("discord")
dpy_logger.setLevel(logging.WARN)
dpy_logger_file = location / 'discord.log'
dpy_logger_file = location / "discord.log"
base_logger = logging.getLogger("tuxbot")
base_logger.setLevel(level)
base_logger_file = location / 'tuxbot.log'
base_logger_file = location / "tuxbot.log"
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}",
datefmt="%Y-%m-%d %H:%M:%S", style="{"
datefmt="%Y-%m-%d %H:%M:%S",
style="{",
)
dpy_handler = logging.handlers.RotatingFileHandler(
str(dpy_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
str(dpy_logger_file.resolve()), maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
base_handler = logging.handlers.RotatingFileHandler(
str(base_logger_file.resolve()),
maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
str(base_logger_file.resolve()), maxBytes=MAX_BYTES, backupCount=MAX_OLD_LOGS
)
stdout_handler = logging.StreamHandler(sys.stdout)

View file

@ -84,11 +84,7 @@ def get_name() -> str:
name = input("> ")
if re.fullmatch(r"[a-zA-Z0-9_\-]*", name) is None:
print()
print(
Fore.RED
+ "ERROR: Invalid characters provided"
+ Style.RESET_ALL
)
print(Fore.RED + "ERROR: Invalid characters provided" + Style.RESET_ALL)
name = ""
return name
@ -117,10 +113,8 @@ def get_data_dir(instance_name: str) -> Path:
except OSError:
print()
print(
Fore.RED
+ f"mkdir: cannot create directory '{path}':"
f" Permission denied"
+ Style.RESET_ALL
Fore.RED + f"mkdir: cannot create directory '{path}':"
f" Permission denied" + Style.RESET_ALL
)
path = ""
@ -136,7 +130,7 @@ def get_data_dir(instance_name: str) -> Path:
data_path_input = input("> ")
if data_path_input != '':
if data_path_input != "":
data_path_input = Path(data_path_input)
try:
@ -144,8 +138,7 @@ def get_data_dir(instance_name: str) -> Path:
except OSError:
print()
print(
Fore.RED
+ "Impossible to verify the validity of the path, "
Fore.RED + "Impossible to verify the validity of the path, "
"make sure it does not contain any invalid characters."
+ Style.RESET_ALL
)
@ -167,9 +160,9 @@ def get_data_dir(instance_name: str) -> Path:
print("Rerun the process to redo this configuration.")
sys.exit(0)
(data_path_input / 'core').mkdir(parents=True, exist_ok=True)
(data_path_input / 'cogs').mkdir(parents=True, exist_ok=True)
(data_path_input / 'logs').mkdir(parents=True, exist_ok=True)
(data_path_input / "core").mkdir(parents=True, exist_ok=True)
(data_path_input / "cogs").mkdir(parents=True, exist_ok=True)
(data_path_input / "logs").mkdir(parents=True, exist_ok=True)
return data_path_input
@ -190,18 +183,21 @@ def get_token() -> str:
"(you can find it at https://discord.com/developers/applications)"
)
token = input("> ")
if re.fullmatch(r"([a-zA-Z0-9]{24}\.[a-zA-Z0-9_]{6}\.[a-zA-Z0-9_\-]{27}|mfa\.[a-zA-Z0-9_\-]{84})", token) is None:
print(
Fore.RED
+ "ERROR: Invalid token provided"
+ Style.RESET_ALL
if (
re.fullmatch(
r"([a-zA-Z0-9]{24}\.[a-zA-Z0-9_]{6}\.[a-zA-Z0-9_\-]{27}|mfa\.[a-zA-Z0-9_\-]{84})",
token,
)
is None
):
print(Fore.RED + "ERROR: Invalid token provided" + Style.RESET_ALL)
token = ""
return token
def get_multiple(question: str, confirmation: str, value_type: type)\
-> List[Union[str, int]]:
def get_multiple(
question: str, confirmation: str, value_type: type
) -> List[Union[str, int]]:
"""Give possibility to user to fill multiple value.
Parameters
@ -219,14 +215,14 @@ def get_multiple(question: str, confirmation: str, value_type: type)\
List containing user filled values.
"""
print(question)
user_input = input('> ')
user_input = input("> ")
if not user_input:
return []
values = [user_input]
while click.confirm(confirmation, default=False):
values.append(value_type(input('> ')))
values.append(value_type(input("> ")))
return values
@ -239,23 +235,23 @@ def additional_config() -> dict:
dict:
Dict with cog name as key and configs as value.
"""
p = Path(r'tuxbot/cogs').glob('**/additional_config.json')
p = Path(r"tuxbot/cogs").glob("**/additional_config.json")
datas = {}
for file in p:
print()
cog_name = str(file.parent).split('/')[-1]
cog_name = str(file.parent).split("/")[-1]
datas[cog_name] = {}
with file.open('r') as f:
with file.open("r") as f:
data = json.load(f)
print(f"\n==Configuration for `{cog_name}` module==")
for key, value in data.items():
print()
print(value['description'])
datas[cog_name][key] = input('> ')
print(value["description"])
datas[cog_name][key] = input("> ")
return datas
@ -273,40 +269,33 @@ def finish_setup(data_dir: Path) -> NoReturn:
token = get_token()
print()
prefixes = get_multiple(
"Choice a (or multiple) prefix for the bot",
"Add another prefix ?",
str
)
mentionable = click.confirm(
"Does the bot answer if it's mentioned?",
default=True
"Choice a (or multiple) prefix for the bot", "Add another prefix ?", str
)
mentionable = click.confirm("Does the bot answer if it's mentioned?", default=True)
owners_id = get_multiple(
"Give the owner id of this bot",
"Add another owner ?",
int
"Give the owner id of this bot", "Add another owner ?", int
)
cogs_config = additional_config()
core_file = data_dir / 'core' / 'settings.json'
core_file = data_dir / "core" / "settings.json"
core = {
'token': token,
'prefixes': prefixes,
'mentionable': mentionable,
'owners_id': owners_id,
'locale': "en-US"
"token": token,
"prefixes": prefixes,
"mentionable": mentionable,
"owners_id": owners_id,
"locale": "en-US",
}
with core_file.open("w") as fs:
json.dump(core, fs, indent=4)
for cog, data in cogs_config.items():
data_cog_dir = data_dir / 'cogs' / cog
data_cog_dir = data_dir / "cogs" / cog
data_cog_dir.mkdir(parents=True, exist_ok=True)
data_cog_file = data_cog_dir / 'settings.json'
data_cog_file = data_cog_dir / "settings.json"
with data_cog_file.open("w") as fs:
json.dump(data, fs, indent=4)
@ -330,13 +319,10 @@ def basic_setup() -> NoReturn:
if name in instances_list:
print()
print(
Fore.RED
+ f"WARNING: An instance named `{name}` already exists "
f"Continuing will overwrite this instance configs."
+ Style.RESET_ALL
Fore.RED + f"WARNING: An instance named `{name}` already exists "
f"Continuing will overwrite this instance configs." + Style.RESET_ALL
)
if not click.confirm("Are you sure you want to continue?",
default=False):
if not click.confirm("Are you sure you want to continue?", default=False):
print("Abandon...")
sys.exit(0)
@ -361,7 +347,8 @@ def setup() -> NoReturn:
base_logger.setLevel(level)
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}",
datefmt="%Y-%m-%d %H:%M:%S", style="{"
datefmt="%Y-%m-%d %H:%M:%S",
style="{",
)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)