import asyncio import functools import logging import time from typing import Union import aiohttp import discord from aiohttp import ClientConnectorError from discord.ext import commands from ipinfo.exceptions import RequestQuotaExceededError from structured_config import ConfigFile from tuxbot.cogs.Network.functions.converters import ( IPConverter, IPVersionConverter, DomainConverter, QueryTypeConverter, ) from tuxbot.cogs.Network.functions.exceptions import ( RFC18, InvalidIp, VersionNotFound, InvalidDomain, InvalidQueryType, ) from tuxbot.core.bot import Tux from tuxbot.core.i18n import ( Translator, ) from tuxbot.core.utils.data_manager import cogs_data_path from tuxbot.core.utils.functions.extra import ( ContextPlus, command_extra, ) from tuxbot.core.utils.functions.utils import shorten from .config import NetworkConfig from .functions.utils import ( get_ip, get_hostname, get_ipinfo_result, get_ipwhois_result, merge_ipinfo_ipwhois, get_pydig_result, check_query_type_or_raise, check_ip_version_or_raise, ) log = logging.getLogger("tuxbot.cogs.Network") _ = Translator("Network", __file__) class Network(commands.Cog, name="Network"): def __init__(self, bot: Tux): self.bot = bot self.__config: NetworkConfig = ConfigFile( str(cogs_data_path("Network") / "config.yaml"), NetworkConfig, ).config async def cog_command_error(self, ctx, error): if isinstance( error, ( RequestQuotaExceededError, RFC18, InvalidIp, InvalidDomain, InvalidQueryType, VersionNotFound, ), ): await ctx.send(_(str(error), ctx, self.bot.config)) # ========================================================================= # ========================================================================= @command_extra(name="iplocalise", aliases=["localiseip"], deletable=True) async def _iplocalise( self, ctx: ContextPlus, ip: IPConverter, version: IPVersionConverter = "", ): check_ip_version_or_raise(str(version)) tmp = await ctx.send( _("*Retrieving information...*", ctx, self.bot.config), deletable=False, ) ip_address = await get_ip(str(ip), str(version), tmp) ip_hostname = await get_hostname(ip_address) ipinfo_result = await get_ipinfo_result( self.__config.ipinfoKey, ip_address ) ipwhois_result = await get_ipwhois_result(ip_address, tmp) merged_results = merge_ipinfo_ipwhois(ipinfo_result, ipwhois_result) e = discord.Embed( title=_( "Information for ``{ip} ({ip_address})``", ctx, self.bot.config ).format(ip=ip, ip_address=ip_address), color=0x5858D7, ) e.add_field( name=_("Belongs to:", ctx, self.bot.config), value=merged_results["belongs"], inline=True, ) e.add_field( name="RIR :", value=merged_results["rir"], inline=True, ) e.add_field( name=_("Region:", ctx, self.bot.config), value=merged_results["region"], inline=False, ) e.set_thumbnail(url=merged_results["flag"]) e.set_footer( text=_("Hostname: {hostname}", ctx, self.bot.config).format( hostname=ip_hostname ), ) await tmp.delete() await ctx.send(embed=e) @command_extra(name="getheaders", aliases=["headers"], deletable=True) async def _getheaders( self, ctx: ContextPlus, ip: DomainConverter, *, user_agent: str = "" ): try: headers = {"User-Agent": user_agent} colors = { "1": 0x17A2B8, "2": 0x28A745, "3": 0xFFC107, "4": 0xDC3545, "5": 0x343A40, } async with ctx.session.get( str(ip), headers=headers, timeout=aiohttp.ClientTimeout(total=4), ) as s: e = discord.Embed( title=f"Headers : {ip}", color=colors.get(str(s.status)[0], 0x6C757D), ) e.add_field( name="Status", value=f"```{s.status}```", inline=True ) e.set_thumbnail(url=f"https://http.cat/{s.status}") headers = dict(s.headers.items()) headers.pop("Set-Cookie", headers) for key, value in headers.items(): output = await shorten(ctx.session, value, 50) if output["link"] is not None: value = _( "[show all]({})", ctx, self.bot.config ).format(output["link"]) else: value = f"```\n{output['text']}```" e.add_field(name=key, value=value, inline=True) await ctx.send(embed=e) except (ClientConnectorError, asyncio.exceptions.TimeoutError): await ctx.send( _("Cannot connect to host {}", ctx, self.bot.config).format(ip) ) @command_extra(name="dig", deletable=True) async def _dig( self, ctx: ContextPlus, domain: IPConverter, query_type: QueryTypeConverter, dnssec: Union[str, bool] = False, ): check_query_type_or_raise(str(query_type)) pydig_result = await self.bot.loop.run_in_executor( None, functools.partial(get_pydig_result, domain, query_type, dnssec), ) e = discord.Embed(title=f"DIG {domain} {query_type}", color=0x5858D7) for i, value in enumerate(pydig_result): e.add_field(name=f"#{i}", value=f"```{value}```") if not pydig_result: e.add_field( name=f"DIG {domain} IN {query_type}", value=_("No result...", ctx, self.bot.config), ) await ctx.send(embed=e) @command_extra(name="ping", deletable=True) async def _ping(self, ctx: ContextPlus): start = time.perf_counter() await ctx.trigger_typing() end = time.perf_counter() latency = round(self.bot.latency * 1000, 2) typing = round((end - start) * 1000, 2) e = discord.Embed(title="Ping", color=discord.Color.teal()) e.add_field(name="Websocket", value=f"{latency}ms") e.add_field(name="Typing", value=f"{typing}ms") await ctx.send(embed=e)