workingOn(conf): -

This commit is contained in:
Romain J 2020-09-02 00:08:06 +02:00
parent d68d54be44
commit e0788137ff
14 changed files with 494 additions and 659 deletions

View file

@ -11,14 +11,16 @@ packages = find_namespace:
python_requires = >=3.7 python_requires = >=3.7
;todo: remove flatten_dict (core/config.py) ;todo: remove flatten_dict (core/config.py)
install_requires = install_requires =
appdirs==1.4.4 appdirs>=1.4.4
Babel==2.8.0 Babel>=2.8.0
discord.py==1.4.1 discord.py==1.4.1
discord_flags==2.1.1 discord_flags==2.1.1
flatten_dict==0.3.0 flatten_dict>=0.3.0
jishaku==1.19.1.200 jishaku>=1.19.1.200
PyYAML==5.3.1 psutil>=5.7.2
rich==6.0.0 PyYAML>=5.3.1
rich>=6.0.0
structured_config>=4.12
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =

View file

@ -1,318 +1,26 @@
import argparse
import asyncio
import json
import logging
import signal
import sys
import os
from argparse import Namespace
from typing import NoReturn from typing import NoReturn
import discord
import pip
import tracemalloc
from rich.columns import Columns
from rich.console import Console from rich.console import Console
from rich.panel import Panel
from rich.traceback import install from rich.traceback import install
from rich.table import Table, box from tuxbot import ExitCodes
from rich.text import Text
from rich import print
import tuxbot.logging
from tuxbot.core import data_manager
from tuxbot.core.bot import Tux
from . import __version__, version_info, ExitCodes
log = logging.getLogger("tuxbot.main")
console = Console() console = Console()
install(console=console) install(console=console)
tracemalloc.start()
def list_instances() -> NoReturn:
"""List all available instances
"""
with data_manager.config_file.open() as fs:
data = json.load(fs)
console.print(
Panel("[bold green]Instances", style="green"),
justify="center"
)
console.print()
columns = Columns(expand=True, padding=2, align="center")
for instance, details in data.items():
is_running = details.get('IS_RUNNING')
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column("Name")
table.add_column(("Running" if is_running else "Down") + " since")
table.add_row(instance, "42")
table.title = Text(
instance,
style="green" if is_running else "red"
)
columns.add_renderable(table)
console.print(columns)
console.print()
sys.exit(os.EX_OK)
def debug_info() -> NoReturn:
"""Show debug info relatives to the bot
"""
python_version = sys.version.replace("\n", "")
pip_version = pip.__version__
tuxbot_version = __version__
dpy_version = discord.__version__
uptime = os.popen('uptime').read().strip().split()
console.print(
Panel("[bold blue]Debug Info", style="blue"),
justify="center"
)
console.print()
columns = Columns(expand=True, padding=2, align="center")
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column(
"Bot Info",
)
table.add_row(f"[u]Tuxbot version:[/u] {tuxbot_version}")
table.add_row(f"[u]Major:[/u] {version_info.major}")
table.add_row(f"[u]Minor:[/u] {version_info.minor}")
table.add_row(f"[u]Micro:[/u] {version_info.micro}")
table.add_row(f"[u]Level:[/u] {version_info.releaselevel}")
table.add_row(f"[u]Last change:[/u] {version_info.info}")
columns.add_renderable(table)
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column(
"Python Info",
)
table.add_row(f"[u]Python version:[/u] {python_version}")
table.add_row(f"[u]Python executable path:[/u] {sys.executable}")
table.add_row(f"[u]Pip version:[/u] {pip_version}")
table.add_row(f"[u]Discord.py version:[/u] {dpy_version}")
columns.add_renderable(table)
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column(
"Server Info",
)
table.add_row(f"[u]System:[/u] {os.uname().sysname}")
table.add_row(f"[u]System arch:[/u] {os.uname().machine}")
table.add_row(f"[u]Kernel:[/u] {os.uname().release}")
table.add_row(f"[u]User:[/u] {os.getlogin()}")
table.add_row(f"[u]Uptime:[/u] {uptime[2]}")
table.add_row(
f"[u]Load Average:[/u] {' '.join(map(str, os.getloadavg()))}"
)
columns.add_renderable(table)
console.print(columns)
console.print()
sys.exit(os.EX_OK)
def parse_cli_flags(args: list) -> Namespace:
"""Parser for cli values.
Parameters
----------
args:list
Is a list of all passed values.
Returns
-------
Namespace
"""
parser = argparse.ArgumentParser(
description="Tuxbot - OpenSource bot",
usage="tuxbot <instance_name> [arguments]",
)
parser.add_argument(
"--version", "-V", action="store_true",
help="Show tuxbot's used version"
)
parser.add_argument(
"--debug", action="store_true",
help="Show debug information."
)
parser.add_argument(
"--list-instances", "-L", action="store_true",
help="List all instance names"
)
parser.add_argument("--token", "-T", type=str,
help="Run Tuxbot with passed token")
parser.add_argument(
"instance_name",
nargs="?",
help="Name of the bot instance created during `tuxbot-setup`.",
)
args = parser.parse_args(args)
return args
async def shutdown_handler(tux: Tux, signal_type, exit_code=None) -> NoReturn:
"""Handler when the bot shutdown
It cancels all running task.
Parameters
----------
tux:Tux
Object for the bot.
signal_type:int, None
Exiting signal code.
exit_code:None|int
Code to show when exiting.
"""
if signal_type:
log.info("%s received. Quitting...", signal_type)
elif exit_code is None:
log.info("Shutting down from unhandled exception")
tux.shutdown_code = ExitCodes.CRITICAL
if exit_code is not None:
tux.shutdown_code = exit_code
await tux.shutdown()
async def run_bot(tux: Tux, cli_flags: Namespace) -> None:
"""This run the bot.
Parameters
----------
tux:Tux
Object for the bot.
cli_flags:Namespace
All different flags passed in the console.
Returns
-------
None
When exiting, this function return None.
"""
data_path = data_manager.data_path(tux.instance_name)
tuxbot.logging.init_logging(10, location=data_path / "logs")
log.debug("====Basic Config====")
log.debug("Data Path: %s", data_path)
if cli_flags.token:
token = cli_flags.token
else:
token = tux.config("core").get("token")
if not token:
log.critical("Token must be set if you want to login.")
sys.exit(ExitCodes.CRITICAL)
try:
await tux.load_packages()
console.print()
await tux.start(token=token, bot=True)
except discord.LoginFailure:
log.critical("This token appears to be valid.")
console.print()
console.print(
"[prompt.invalid]This token appears to be valid. [i]exiting...[/i]"
)
sys.exit(ExitCodes.CRITICAL)
except Exception as e:
raise e
return None
def main() -> NoReturn: def main() -> NoReturn:
"""Main function
"""
tux = None
cli_flags = parse_cli_flags(sys.argv[1:])
if cli_flags.list_instances:
list_instances()
elif cli_flags.debug:
debug_info()
elif cli_flags.version:
print(f"Tuxbot V{version_info.major}")
print(f"Complete Version: {__version__}")
sys.exit(os.EX_OK)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
if not cli_flags.instance_name: from .__run__ import run
console.print(
"[red]No instance provided ! "
"You can use 'tuxbot -L' to list all available instances"
)
sys.exit(ExitCodes.CRITICAL)
tux = Tux( run()
cli_flags=cli_flags,
description="Tuxbot, made from and for OpenSource",
dm_help=None,
)
loop.run_until_complete(run_bot(tux, cli_flags))
except KeyboardInterrupt:
console.print(
" [red]Please use <prefix>quit instead of Ctrl+C to Shutdown!"
)
log.warning("Please use <prefix>quit instead of Ctrl+C to Shutdown!")
log.info("Received KeyboardInterrupt")
console.print("[i]Trying to shutdown...")
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, signal.SIGINT))
except SystemExit as exc: except SystemExit as exc:
log.info("Shutting down with exit code: %s", exc.code) if exc.code == ExitCodes.RESTART:
if tux is not None: from .__run__ import run # reimport to load changes
loop.run_until_complete(shutdown_handler(tux, None, exc.code)) run()
except Exception as exc: else:
log.error("Unexpected exception (%s): ", type(exc)) raise exc
except Exception:
console.print_exception() console.print_exception()
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, 1))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
log.info("Please wait, cleaning up a bit more")
loop.run_until_complete(asyncio.sleep(1))
asyncio.set_event_loop(None)
loop.stop()
loop.close()
exit_code = ExitCodes.CRITICAL if tux is None else tux.shutdown_code
sys.exit(exit_code)
if __name__ == "__main__": if __name__ == "__main__":

315
tuxbot/__run__.py Normal file
View file

@ -0,0 +1,315 @@
import argparse
import asyncio
import json
import logging
import signal
import sys
import os
from argparse import Namespace
from typing import NoReturn
import discord
import pip
import tracemalloc
from rich.columns import Columns
from rich.console import Console
from rich.panel import Panel
from rich.traceback import install
from rich.table import Table, box
from rich.text import Text
from rich import print
import tuxbot.logging
from tuxbot.core import data_manager
from tuxbot.core.bot import Tux
from . import __version__, version_info, ExitCodes
log = logging.getLogger("tuxbot.main")
console = Console()
install(console=console)
tracemalloc.start()
def list_instances() -> NoReturn:
"""List all available instances
"""
with data_manager.config_file.open() as fs:
data = json.load(fs)
console.print(
Panel("[bold green]Instances", style="green"),
justify="center"
)
console.print()
columns = Columns(expand=True, padding=2, align="center")
for instance, details in data.items():
is_running = details.get('IS_RUNNING')
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column("Name")
table.add_column(("Running" if is_running else "Down") + " since")
table.add_row(instance, "42")
table.title = Text(
instance,
style="green" if is_running else "red"
)
columns.add_renderable(table)
console.print(columns)
console.print()
sys.exit(os.EX_OK)
def debug_info() -> NoReturn:
"""Show debug info relatives to the bot
"""
python_version = sys.version.replace("\n", "")
pip_version = pip.__version__
tuxbot_version = __version__
dpy_version = discord.__version__
uptime = os.popen('uptime').read().strip().split()
console.print(
Panel("[bold blue]Debug Info", style="blue"),
justify="center"
)
console.print()
columns = Columns(expand=True, padding=2, align="center")
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column(
"Bot Info",
)
table.add_row(f"[u]Tuxbot version:[/u] {tuxbot_version}")
table.add_row(f"[u]Major:[/u] {version_info.major}")
table.add_row(f"[u]Minor:[/u] {version_info.minor}")
table.add_row(f"[u]Micro:[/u] {version_info.micro}")
table.add_row(f"[u]Level:[/u] {version_info.releaselevel}")
table.add_row(f"[u]Last change:[/u] {version_info.info}")
columns.add_renderable(table)
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column(
"Python Info",
)
table.add_row(f"[u]Python version:[/u] {python_version}")
table.add_row(f"[u]Python executable path:[/u] {sys.executable}")
table.add_row(f"[u]Pip version:[/u] {pip_version}")
table.add_row(f"[u]Discord.py version:[/u] {dpy_version}")
columns.add_renderable(table)
table = Table(
style="dim", border_style="not dim",
box=box.HEAVY_HEAD
)
table.add_column(
"Server Info",
)
table.add_row(f"[u]System:[/u] {os.uname().sysname}")
table.add_row(f"[u]System arch:[/u] {os.uname().machine}")
table.add_row(f"[u]Kernel:[/u] {os.uname().release}")
table.add_row(f"[u]User:[/u] {os.getlogin()}")
table.add_row(f"[u]Uptime:[/u] {uptime[2]}")
table.add_row(
f"[u]Load Average:[/u] {' '.join(map(str, os.getloadavg()))}"
)
columns.add_renderable(table)
console.print(columns)
console.print()
sys.exit(os.EX_OK)
def parse_cli_flags(args: list) -> Namespace:
"""Parser for cli values.
Parameters
----------
args:list
Is a list of all passed values.
Returns
-------
Namespace
"""
parser = argparse.ArgumentParser(
description="Tuxbot - OpenSource bot",
usage="tuxbot <instance_name> [arguments]",
)
parser.add_argument(
"--version", "-V", action="store_true",
help="Show tuxbot's used version"
)
parser.add_argument(
"--debug", action="store_true",
help="Show debug information."
)
parser.add_argument(
"--list-instances", "-L", action="store_true",
help="List all instance names"
)
parser.add_argument("--token", "-T", type=str,
help="Run Tuxbot with passed token")
parser.add_argument(
"instance_name",
nargs="?",
help="Name of the bot instance created during `tuxbot-setup`.",
)
args = parser.parse_args(args)
return args
async def shutdown_handler(tux: Tux, signal_type, exit_code=None) -> NoReturn:
"""Handler when the bot shutdown
It cancels all running task.
Parameters
----------
tux:Tux
Object for the bot.
signal_type:int, None
Exiting signal code.
exit_code:None|int
Code to show when exiting.
"""
if signal_type:
log.info("%s received. Quitting...", signal_type)
elif exit_code is None:
log.info("Shutting down from unhandled exception")
tux.shutdown_code = ExitCodes.CRITICAL
if exit_code is not None:
tux.shutdown_code = exit_code
await tux.shutdown()
async def run_bot(tux: Tux, cli_flags: Namespace) -> None:
"""This run the bot.
Parameters
----------
tux:Tux
Object for the bot.
cli_flags:Namespace
All different flags passed in the console.
Returns
-------
None
When exiting, this function return None.
"""
data_path = data_manager.data_path(tux.instance_name)
tuxbot.logging.init_logging(10, location=data_path / "logs")
log.debug("====Basic Config====")
log.debug("Data Path: %s", data_path)
if cli_flags.token:
token = cli_flags.token
else:
token = tux.config("core").get("token")
if not token:
log.critical("Token must be set if you want to login.")
sys.exit(ExitCodes.CRITICAL)
try:
await tux.load_packages()
console.print()
await tux.start(token=token, bot=True)
except discord.LoginFailure:
log.critical("This token appears to be valid.")
console.print()
console.print(
"[prompt.invalid]This token appears to be valid. [i]exiting...[/i]"
)
sys.exit(ExitCodes.CRITICAL)
except Exception as e:
raise e
return None
def run() -> NoReturn:
"""Main function
"""
tux = None
cli_flags = parse_cli_flags(sys.argv[1:])
if cli_flags.list_instances:
list_instances()
elif cli_flags.debug:
debug_info()
elif cli_flags.version:
print(f"Tuxbot V{version_info.major}")
print(f"Complete Version: {__version__}")
sys.exit(os.EX_OK)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if not cli_flags.instance_name:
console.print(
"[red]No instance provided ! "
"You can use 'tuxbot -L' to list all available instances"
)
sys.exit(ExitCodes.CRITICAL)
tux = Tux(
cli_flags=cli_flags,
description="Tuxbot, made from and for OpenSource",
dm_help=None,
)
loop.run_until_complete(run_bot(tux, cli_flags))
except KeyboardInterrupt:
console.print(
" [red]Please use <prefix>quit instead of Ctrl+C to Shutdown!"
)
log.warning("Please use <prefix>quit instead of Ctrl+C to Shutdown!")
log.info("Received KeyboardInterrupt")
console.print("[i]Trying to shutdown...")
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, signal.SIGINT))
except SystemExit as exc:
log.info("Shutting down with exit code: %s", exc.code)
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, exc.code))
except Exception as exc:
log.error("Unexpected exception (%s): ", type(exc))
console.print_exception()
if tux is not None:
loop.run_until_complete(shutdown_handler(tux, None, 1))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
log.info("Please wait, cleaning up a bit more")
loop.run_until_complete(asyncio.sleep(1))
asyncio.set_event_loop(None)
loop.stop()
loop.close()
exit_code = ExitCodes.CRITICAL if tux is None else tux.shutdown_code
sys.exit(exit_code)

View file

@ -1,6 +1,7 @@
from collections import namedtuple from collections import namedtuple
from .admin import Admin from .admin import Admin
from .config import AdminConfig
from ...core.bot import Tux from ...core.bot import Tux
VersionInfo = namedtuple("VersionInfo", "major minor micro release_level") VersionInfo = namedtuple("VersionInfo", "major minor micro release_level")

View file

@ -6,7 +6,10 @@ from discord.ext import commands
from tuxbot.core import checks from tuxbot.core import checks
from tuxbot.core.bot import Tux from tuxbot.core.bot import Tux
from tuxbot.core.i18n import Translator, find_locale, get_locale_name, available_locales from tuxbot.core.i18n import Translator, find_locale, get_locale_name, available_locales
from tuxbot.core.utils.functions.extra import group_extra, ContextPlus from tuxbot.core.utils.functions.extra import (
group_extra, command_extra,
ContextPlus
)
log = logging.getLogger("tuxbot.cogs.admin") log = logging.getLogger("tuxbot.cogs.admin")
_ = Translator("Admin", __file__) _ = Translator("Admin", __file__)
@ -40,10 +43,30 @@ class Admin(commands.Cog, name="Admin"):
@_lang.command(name="list", aliases=["liste", "all", "view"]) @_lang.command(name="list", aliases=["liste", "all", "view"])
async def _lang_list(self, ctx: ContextPlus): async def _lang_list(self, ctx: ContextPlus):
description = ''
for key, value in available_locales.items():
description += f":flag_{key[-2:].lower()}: {value[0]}\n"
e = discord.Embed( e = discord.Embed(
title=_("List of available locales: ", ctx, self.bot.config), title=_("List of available locales: ", ctx, self.bot.config),
description="\n".join([i[0] for i in available_locales.values()]), description=description,
color=0x36393E, color=0x36393E,
) )
await ctx.send(embed=e) await ctx.send(embed=e)
# =========================================================================
@command_extra(name="quit", aliases=["shutdown"], deletable=False)
@commands.guild_only()
@checks.is_owner()
async def _quit(self, ctx: ContextPlus):
await ctx.send("*quit...*")
await self.bot.shutdown()
@command_extra(name="restart", deletable=False)
@commands.guild_only()
@checks.is_owner()
async def _restart(self, ctx: ContextPlus):
await ctx.send("*restart...*")
await self.bot.shutdown(restart=True)

View file

@ -0,0 +1,18 @@
from structured_config import Structure, StrField
class AdminConfig(Structure):
dm: str = StrField("")
mentions: str = StrField("")
guilds: str = StrField("")
errors: str = StrField("")
gateway: str = StrField("")
extra = {
'dm': str,
'mentions': str,
'guilds': str,
'errors': str,
'gateway': str,
}

View file

@ -25,7 +25,10 @@ log = logging.getLogger("tuxbot")
console = Console() console = Console()
install(console=console) install(console=console)
packages: List[str] = ["jishaku", "tuxbot.cogs.warnings", "tuxbot.cogs.admin"] packages: List[str] = [
"jishaku",
"tuxbot.cogs.admin"
]
class Tux(commands.AutoShardedBot): class Tux(commands.AutoShardedBot):
@ -109,6 +112,7 @@ class Tux(commands.AutoShardedBot):
self._progress.get("main").remove_task( self._progress.get("main").remove_task(
self._progress.get("tasks")["connecting"] self._progress.get("tasks")["connecting"]
) )
self._progress.get("tasks").pop("connecting")
console.clear() console.clear()
console.print( console.print(
@ -155,8 +159,8 @@ class Tux(commands.AutoShardedBot):
console.print(columns) console.print(columns)
console.print() console.print()
async def is_owner(self, async def is_owner(self, user: Union[discord.User, discord.Member])\
user: Union[discord.User, discord.Member]) -> bool: -> bool:
"""Determines if the user is a bot owner. """Determines if the user is a bot owner.
Parameters Parameters
@ -245,7 +249,7 @@ class Tux(commands.AutoShardedBot):
for task in pending: for task in pending:
console.log("Canceling", task.get_name(), f"({task.get_coro()})") console.log("Canceling", task.get_name(), f"({task.get_coro()})")
task.cancel() task.cancel()
await asyncio.gather(*pending, return_exceptions=True) await asyncio.gather(*pending, return_exceptions=False)
await super().logout() await super().logout()
@ -265,4 +269,8 @@ class Tux(commands.AutoShardedBot):
self.shutdown_code = ExitCodes.RESTART self.shutdown_code = ExitCodes.RESTART
await self.logout() await self.logout()
sys.exit(self.shutdown_code)
sys_e = SystemExit()
sys_e.code = self.shutdown_code
raise sys_e

View file

@ -1,8 +1,10 @@
import asyncio import asyncio
import json
import logging import logging
from typing import List, Dict, Union, Any from typing import List, Dict
from flatten_dict import flatten, unflatten from structured_config import (
ConfigFile,
Structure, IntField, StrField, BoolField
)
import discord import discord
@ -13,159 +15,44 @@ __all__ = ["Config"]
log = logging.getLogger("tuxbot.core.config") log = logging.getLogger("tuxbot.core.config")
class Config: class Server(Structure):
def __init__(self, cog_instance: str = None): prefixes: List[str] = []
self._cog_instance = cog_instance disabled_command: List[str] = []
locale: str = StrField("")
self.lock = asyncio.Lock()
self.loop = asyncio.get_event_loop()
self._settings_file = None class User(Structure):
self._datas = {} aliases: List[dict] = []
locale: str = StrField("")
def __getitem__(self, item) -> Dict:
path = data_path(self._cog_instance)
if item != "core": class Config(Structure):
path = path / "cogs" / item class Servers(Structure):
else: count: int = IntField(0)
path /= "core" all: List[Server] = []
settings_file = path / "settings.json" class Users(Structure):
all: List[User] = []
if not settings_file.exists(): class Core(Structure):
raise FileNotFoundError( owners_id: List[int] = []
f"Unable to find settings file " f"'{settings_file}'" prefixes: List[str] = []
) token: str = StrField("")
else: mentionable: bool = BoolField("")
with settings_file.open("r") as f: locale: str = StrField("")
return json.load(f)
def __call__(self, item): class Cogs(Structure):
return self.__getitem__(item) pass
def owners_id(self) -> List[int]:
"""Simply return the owners id saved in config file.
Returns # =============================================================================
------- # Configuration of Tuxbot Application (not the bot)
str # =============================================================================
Owners id.
"""
return self.__getitem__("core").get("owners_id")
def token(self) -> str: class Instance(Structure):
"""Simply return the bot token saved in config file. path: str = StrField("")
active: bool = BoolField(False)
Returns
-------
str
Bot token.
"""
return self.__getitem__("core").get("token")
def get_prefixes(self, guild: discord.Guild) -> List[str]: class AppConfig(Structure):
"""Get custom prefixes for one guild. instances: Dict[str, Instance] = {}
Parameters
----------
guild:discord.Guild
The required guild prefixes.
Returns
-------
List[str]
List of all prefixes.
"""
core = self.__getitem__("core")
prefixes = core.get("guild", {}).get(guild.id, {}).get("prefixes", [])
return prefixes
def get_blacklist(self, key: str) -> List[Union[str, int]]:
"""Return list off all blacklisted values
Parameters
----------
key:str
Which type of blacklist to choice (guilds ? channels ?,...).
Returns
-------
List[Union[str, int]]
List containing blacklisted values.
"""
core = self.__getitem__("core")
blacklist = core.get("blacklist", {}).get(key, [])
return blacklist
def _dump(self):
with self._settings_file.open("w") as f:
json.dump(self._datas, f, indent=4)
async def update(self, cog_name: str, item: str, value: Any) -> dict:
"""Update values in config file.
Parameters
----------
cog_name:str
Name of cog who's corresponding to the config file.
item:str
Key to update.
value:Any
New values to apply.
Returns
-------
dict:
Updated values.
"""
datas = self.__getitem__(cog_name)
path = data_path(self._cog_instance)
flat_datas = flatten(datas)
flat_datas[tuple(item.split("."))] = value
datas = unflatten(flat_datas)
self._datas = datas
if cog_name != "core":
path = path / "cogs" / cog_name
else:
path /= "core"
self._settings_file = path / "settings.json"
async with self.lock:
await self.loop.run_in_executor(None, self._dump)
return datas
def get_value(self, cog_name: str, key: str, default: Any = None) -> Any:
"""Get value by key.
Parameters
----------
cog_name:str
Name of cog who's corresponding to the config file.
key:str
Key to fetch.
default:Any|Optional
Default value.
Returns
-------
Any:
Recovered value.
"""
datas = self.__getitem__(cog_name)
flat_datas = flatten(datas)
try:
return flat_datas[tuple(key.split("."))]
except KeyError:
return default

View file

@ -7,7 +7,7 @@ log = logging.getLogger("tuxbot.core.data_manager")
app_dir = appdirs.AppDirs("Tuxbot-bot") app_dir = appdirs.AppDirs("Tuxbot-bot")
config_dir = Path(app_dir.user_config_dir) config_dir = Path(app_dir.user_config_dir)
config_file = config_dir / "config.json" config_file = config_dir / "config.yaml"
def data_path(instance_name: str) -> Path: def data_path(instance_name: str) -> Path:

View file

@ -0,0 +1,9 @@
from discord.ext import commands
class DisabledCommandByServerOwner(commands.CheckFailure):
pass
class DisabledCommandByBotOwner(commands.CheckFailure):
pass

View file

@ -38,14 +38,14 @@ def get_locale_name(locale: str) -> str:
class Translator(Callable[[str], str]): class Translator(Callable[[str], str]):
"""Class to load texts at init.""" """Class to load texts at init."""
def __init__(self, name: str, file_location: Union[str, Path, os.PathLike]): def __init__(self, name: str, file_location: Union[Path, os.PathLike]):
"""Initializes the Translator object. """Initializes the Translator object.
Parameters Parameters
---------- ----------
name : str name : str
The cog name. The cog name.
file_location:str|Path|os.PathLike file_location:Path|os.PathLike
File path for the required extension. File path for the required extension.
""" """

View file

@ -1,89 +0,0 @@
import codecs
import itertools
import sys
def bordered(*columns: dict) -> str:
"""
credits to https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/develop/redbot/core/utils/chat_formatting.py
Get two blocks of text in a borders.
Note
----
This will only work with a monospaced font.
Parameters
----------
*columns : `sequence` of `str`
The columns of text, each being a list of lines in that column.
Returns
-------
str
The bordered text.
"""
encoder = codecs.getencoder(sys.stdout.encoding)
try:
encoder("┌┐└┘─│") # border symbols
except UnicodeEncodeError:
ascii_border = True
else:
ascii_border = False
borders = {
"TL": "+" if ascii_border else "", # Top-left
"TR": "+" if ascii_border else "", # Top-right
"BL": "+" if ascii_border else "", # Bottom-left
"BR": "+" if ascii_border else "", # Bottom-right
"HZ": "-" if ascii_border else "", # Horizontal
"VT": "|" if ascii_border else "", # Vertical
}
sep = " " * 4 # Separator between boxes
widths = tuple(
max(len(row) for row in column.get("rows")) + 9 for column in columns
) # width of each col
cols_done = [False] * len(columns) # whether or not each column is done
lines = [""]
for i, column in enumerate(columns):
lines[0] += (
"{TL}"
+ "{HZ}"
+ column.get("title")
+ "{HZ}" * (widths[i] - len(column.get("title")) - 1)
+ "{TR}"
+ sep
)
for line in itertools.zip_longest(*[column.get("rows") for column in columns]):
row = []
for colidx, column in enumerate(line):
width = widths[colidx]
done = cols_done[colidx]
if column is None:
if not done:
# bottom border of column
column = "{HZ}" * width
row.append("{BL}" + column + "{BR}")
cols_done[colidx] = True # mark column as done
else:
# leave empty
row.append(" " * (width + 2))
else:
column += " " * (width - len(column)) # append padded spaces
row.append("{VT}" + column + "{VT}")
lines.append(sep.join(row))
final_row = []
for width, done in zip(widths, cols_done):
if not done:
final_row.append("{BL}" + "{HZ}" * width + "{BR}")
else:
final_row.append(" " * (width + 2))
lines.append(sep.join(final_row))
return "\n".join(lines).format(**borders)

View file

@ -5,6 +5,11 @@ import discord
from discord import Embed from discord import Embed
from discord.ext import commands, flags from discord.ext import commands, flags
from rich.console import Console
console = Console()
console.clear()
class ContextPlus(commands.Context): class ContextPlus(commands.Context):
async def send(self, content=None, *args, **kwargs): async def send(self, content=None, *args, **kwargs):
@ -16,12 +21,11 @@ class ContextPlus(commands.Context):
e = str(kwargs.get('embed').to_dict()) e = str(kwargs.get('embed').to_dict())
e = e.replace(self.bot.config('core').get('token'), '<token>') e = e.replace(self.bot.config('core').get('token'), '<token>')
e = yaml.load(e, Loader=yaml.FullLoader) e = yaml.load(e, Loader=yaml.FullLoader)
kwargs['embed'] = Embed.from_dict(e) kwargs['embed'] = Embed.from_dict(e)
if ( if (
hasattr(self.command, "deletable") and self.command.deletable hasattr(self.command, "deletable") and self.command.deletable
) and kwargs.pop("deletable", True): ) or kwargs.pop("deletable", False):
message = await super().send(content, *args, **kwargs) message = await super().send(content, *args, **kwargs)
await message.add_reaction("🗑") await message.add_reaction("🗑")
@ -33,7 +37,10 @@ class ContextPlus(commands.Context):
) )
try: try:
await self.bot.wait_for("reaction_add", timeout=45.0, check=check) await self.bot.wait_for(
"reaction_add",
timeout=42.0, check=check
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
await message.remove_reaction("🗑", self.bot.user) await message.remove_reaction("🗑", self.bot.user)
else: else:

View file

@ -9,9 +9,9 @@ from rich.prompt import Prompt, IntPrompt
from rich.console import Console from rich.console import Console
from rich.rule import Rule from rich.rule import Rule
from rich.traceback import install from rich.traceback import install
from rich import print
from tuxbot.core.data_manager import config_dir, app_dir from tuxbot.core.data_manager import config_dir, app_dir
from tuxbot.core import config
console = Console() console = Console()
console.clear() console.clear()
@ -20,56 +20,15 @@ install(console=console)
try: try:
config_dir.mkdir(parents=True, exist_ok=True) config_dir.mkdir(parents=True, exist_ok=True)
except PermissionError: except PermissionError:
print(f"mkdir: cannot create directory '{config_dir}': Permission denied") console.print(f"mkdir: cannot create directory '{config_dir}': Permission denied")
sys.exit(1) sys.exit(1)
config_file = config_dir / "config.json" app_config = config.ConfigFile(config_dir / "config.yaml", config.AppConfig)
if not app_config.config.instances:
def load_existing_config() -> dict:
"""Loading and returning configs.
Returns
-------
dict
a dict containing all configurations.
"""
if not config_file.exists():
return {}
with config_file.open() as fs:
return json.load(fs)
instances_data = load_existing_config()
if not instances_data:
instances_list = [] instances_list = []
else: else:
instances_list = list(instances_data.keys()) instances_list = list(app_config.config.instances.keys())
def save_config(name: str, data: dict, delete=False) -> NoReturn:
"""save data in config file.
Parameters
----------
name:str
name of instance.
data:dict
settings for `name` instance.
delete:bool
delete or no data.
"""
_config = load_existing_config()
if delete and name in _config:
_config.pop(name)
else:
_config[name] = data
with config_file.open("w") as fs:
json.dump(_config, fs, indent=4)
def get_name() -> str: def get_name() -> str:
@ -89,8 +48,8 @@ def get_name() -> str:
console=console console=console
) )
if re.fullmatch(r"[a-zA-Z0-9_\-]*", name) is None: if re.fullmatch(r"[a-zA-Z0-9_\-]*", name) is None:
print() console.print()
print("[prompt.invalid]ERROR: Invalid characters provided") console.print("[prompt.invalid]ERROR: Invalid characters provided")
name = "" name = ""
return name return name
@ -111,14 +70,14 @@ def get_data_dir(instance_name: str) -> Path:
""" """
data_path = Path(app_dir.user_data_dir) / "data" / instance_name data_path = Path(app_dir.user_data_dir) / "data" / instance_name
data_path_input = "" data_path_input = ""
print() console.print()
def make_data_dir(path: Path) -> Union[Path, str]: def make_data_dir(path: Path) -> Union[Path, str]:
try: try:
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
except OSError: except OSError:
print() console.print()
print( console.print(
f"mkdir: cannot create directory '{path}': Permission denied" f"mkdir: cannot create directory '{path}': Permission denied"
) )
path = "" path = ""
@ -137,8 +96,8 @@ def get_data_dir(instance_name: str) -> Path:
try: try:
exists = data_path_input.exists() exists = data_path_input.exists()
except OSError: except OSError:
print() console.print()
print( console.print(
"[prompt.invalid]" "[prompt.invalid]"
"Impossible to verify the validity of the path," "Impossible to verify the validity of the path,"
" make sure it does not contain any invalid characters." " make sure it does not contain any invalid characters."
@ -149,8 +108,8 @@ def get_data_dir(instance_name: str) -> Path:
if data_path_input and not exists: if data_path_input and not exists:
data_path_input = make_data_dir(data_path_input) data_path_input = make_data_dir(data_path_input)
print() console.print()
print( console.print(
f"You have chosen {data_path_input} to be your config directory for " f"You have chosen {data_path_input} to be your config directory for "
f"`{instance_name}` instance" f"`{instance_name}` instance"
) )
@ -160,7 +119,7 @@ def get_data_dir(instance_name: str) -> Path:
choices=["y", "n"], default="y", choices=["y", "n"], default="y",
console=console console=console
) != "y": ) != "y":
print("Rerun the process to redo this configuration.") console.print("Rerun the process to redo this configuration.")
sys.exit(0) sys.exit(0)
(data_path_input / "core").mkdir(parents=True, exist_ok=True) (data_path_input / "core").mkdir(parents=True, exist_ok=True)
@ -191,7 +150,7 @@ def get_token() -> str:
r"|mfa\.[a-zA-Z0-9_\-]{84})", r"|mfa\.[a-zA-Z0-9_\-]{84})",
token) \ token) \
is None: is None:
print("[prompt.invalid]ERROR: Invalid token provided") console.print("[prompt.invalid]ERROR: Invalid token provided")
token = "" token = ""
return token return token
@ -234,7 +193,7 @@ def get_multiple(
if new not in values: if new not in values:
values.append(new) values.append(new)
else: else:
print( console.print(
f"[prompt.invalid]" f"[prompt.invalid]"
f"ERROR: `{new}` is already present, [i]ignored[/i]" f"ERROR: `{new}` is already present, [i]ignored[/i]"
) )
@ -250,21 +209,21 @@ def additional_config() -> dict:
dict: dict:
Dict with cog name as key and configs as value. Dict with cog name as key and configs as value.
""" """
p = Path(r"tuxbot/cogs").glob("**/additional_config.json") p = Path("tuxbot/cogs").glob("**/config.py")
data = {} data = {}
for file in p: for file in p:
print("\n" * 4) console.print("\n" * 4)
cog_name = str(file.parent).split("/")[-1] cog_name = str(file.parent).split("/")[-1]
data[cog_name] = {} data[cog_name] = {}
with file.open("r") as f: with file.open("r") as f:
data = json.load(f) data = json.load(f)
print(Rule(f"\nConfiguration for `{cog_name}` module")) console.print(Rule(f"\nConfiguration for `{cog_name}` module"))
for key, value in data.items(): for key, value in data.items():
print() console.print()
data[cog_name][key] = Prompt.ask(value["description"]) data[cog_name][key] = Prompt.ask(value["description"])
return data return data
@ -278,79 +237,62 @@ def finish_setup(data_dir: Path) -> NoReturn:
data_dir:Path data_dir:Path
Where to save configs. Where to save configs.
""" """
print( console.print(
Rule( Rule(
"Now, it's time to finish this setup by giving bot information" "Now, it's time to finish this setup by giving bot information"
) )
) )
print() console.print()
token = get_token() token = get_token()
print() console.print()
prefixes = get_multiple( prefixes = get_multiple(
"Choice a (or multiple) prefix for the bot", "Add another prefix ?", "Choice a (or multiple) prefix for the bot", "Add another prefix ?",
str str
) )
print() console.print()
mentionable = Prompt.ask( mentionable = Prompt.ask(
"Does the bot answer if it's mentioned?", "Does the bot answer if it's mentioned?",
choices=["y", "n"], choices=["y", "n"],
default="y" default="y"
) == "y" ) == "y"
print() console.print()
owners_id = get_multiple( owners_id = get_multiple(
"Give the owner id of this bot", "Add another owner ?", int "Give the owner id of this bot", "Add another owner ?", int
) )
cogs_config = additional_config() # cogs_config = additional_config()
core_file = data_dir / "core" / "settings.json" instance_config = config.ConfigFile(
core = { str(data_dir / "config.yaml"), config.Config
"token": token, )
"prefixes": prefixes,
"mentionable": mentionable,
"owners_id": owners_id,
"locale": "en-US",
}
with core_file.open("w") as fs: instance_config.config.Core.owners_id = owners_id
json.dump(core, fs, indent=4) instance_config.config.Core.prefixes = prefixes
instance_config.config.Core.token = token
for cog, data in cogs_config.items(): instance_config.config.Core.mentionable = mentionable
data_cog_dir = data_dir / "cogs" / cog instance_config.config.Core.locale = "en-US"
data_cog_dir.mkdir(parents=True, exist_ok=True)
data_cog_file = data_cog_dir / "settings.json"
with data_cog_file.open("w") as fs:
json.dump(data, fs, indent=4)
def basic_setup() -> NoReturn: def basic_setup() -> NoReturn:
"""Configs who refer to instances. """Configs who refer to instances.
""" """
print( console.print(
Rule( Rule(
"Hi ! it's time for you to give me information about you instance" "Hi ! it's time for you to give me information about you instance"
) )
) )
print() console.print()
name = get_name() name = get_name()
data_dir = get_data_dir(name) data_dir = get_data_dir(name)
configs = load_existing_config()
instance_config = configs[name] if name in instances_list else {}
instance_config["DATA_PATH"] = str(data_dir.resolve())
instance_config["IS_RUNNING"] = False
if name in instances_list: if name in instances_list:
print() console.print()
console.print( console.print(
f"WARNING: An instance named `{name}` already exists " f"WARNING: An instance named `{name}` already exists "
f"Continuing will overwrite this instance configs.", style="red" f"Continuing will overwrite this instance configs.", style="red"
@ -359,17 +301,21 @@ def basic_setup() -> NoReturn:
"Are you sure you want to continue?", "Are you sure you want to continue?",
choices=["y", "n"], default="n" choices=["y", "n"], default="n"
) == "n": ) == "n":
print("Abandon...") console.print("Abandon...")
sys.exit(0) sys.exit(0)
save_config(name, instance_config) instance = config.Instance()
instance.path = str(data_dir.resolve())
instance.active = False
print("\n" * 4) app_config.config.instances[name] = instance
console.print("\n" * 4)
finish_setup(data_dir) finish_setup(data_dir)
print() console.print()
print( console.print(
f"Instance successfully created! " f"Instance successfully created! "
f"You can now run `tuxbot {name}` to launch this instance" f"You can now run `tuxbot {name}` to launch this instance"
) )
@ -392,8 +338,8 @@ def setup() -> NoReturn:
basic_setup() basic_setup()
except KeyboardInterrupt: except KeyboardInterrupt:
print("Exiting...") console.print("Exiting...")
except: except Exception:
console.print_exception() console.print_exception()