tuxbot-bot/tuxbot/cogs/Network/network.py

414 lines
13 KiB
Python

import asyncio
import logging
import time
from datetime import datetime
from typing import Optional
import aiohttp
import discord
from aiohttp import ClientConnectorError, InvalidURL, TCPConnector
from jishaku.models import copy_context_with
from discord.ext import commands, tasks
from ipinfo.exceptions import RequestQuotaExceededError
from structured_config import ConfigFile
from tuxbot.cogs.Network.functions.converters import (
IPConverter,
IPParamsConverter,
DomainConverter,
QueryTypeConverter,
ASConverter,
)
from tuxbot.cogs.Network.functions.exceptions import (
RFC18,
InvalidIp,
VersionNotFound,
InvalidDomain,
InvalidQueryType,
InvalidAsn,
)
from tuxbot.core.bot import Tux
from tuxbot.core.i18n import (
Translator,
)
from tuxbot.core.utils.data_manager import cogs_data_path
from tuxbot.core.utils.functions.extra import (
ContextPlus,
command_extra,
)
from tuxbot.core.utils.functions.utils import shorten, str_if_empty
from .config import NetworkConfig
from .functions.utils import (
get_ip,
get_hostname,
get_crimeflare_result,
get_ipinfo_result,
get_ipwhois_result,
get_map_bytes,
get_pydig_result,
merge_ipinfo_ipwhois,
check_query_type_or_raise,
check_ip_version_or_raise,
check_asn_or_raise,
)
log = logging.getLogger("tuxbot.cogs.Network")
_ = Translator("Network", __file__)
class Network(commands.Cog):
_peeringdb_net: Optional[dict]
def __init__(self, bot: Tux):
self.bot = bot
self.__config: NetworkConfig = ConfigFile(
str(cogs_data_path("Network") / "config.yaml"),
NetworkConfig,
).config
self._peeringdb_net = None
self._update_peering_db.start() # pylint: disable=no-member
async def cog_command_error(self, ctx: ContextPlus, error):
if isinstance(
error,
(
RequestQuotaExceededError,
RFC18,
InvalidIp,
InvalidDomain,
InvalidQueryType,
VersionNotFound,
InvalidAsn,
),
):
await ctx.send(_(str(error), ctx, self.bot.config))
async def cog_before_invoke(self, ctx: ContextPlus):
await ctx.trigger_typing()
def cog_unload(self):
self._update_peering_db.cancel() # pylint: disable=no-member
@tasks.loop(hours=1.0)
async def _update_peering_db(self):
try:
async with aiohttp.ClientSession(
connector=TCPConnector(verify_ssl=False)
) as cs:
async with cs.get(
"https://3.233.208.117/api/net",
timeout=aiohttp.ClientTimeout(total=60),
) as s:
self._peeringdb_net = await s.json()
except asyncio.exceptions.TimeoutError:
pass
else:
log.log(logging.INFO, "_update_peering_db")
# =========================================================================
# =========================================================================
@command_extra(name="iplocalise", aliases=["localiseip"], deletable=True)
async def _iplocalise(
self,
ctx: ContextPlus,
ip: IPConverter,
*,
params: Optional[IPParamsConverter] = None,
):
# noinspection PyUnresolvedReferences
check_ip_version_or_raise(params) # type: ignore
# noinspection PyUnresolvedReferences
ip_address = await get_ip(
self.bot.loop, str(ip), params # type: ignore
)
ip_hostname = await get_hostname(self.bot.loop, str(ip_address))
ipinfo_result = await get_ipinfo_result(
self.bot.loop, self.__config.ipinfoKey, ip_address
)
ipwhois_result = await get_ipwhois_result(self.bot.loop, ip_address)
merged_results = merge_ipinfo_ipwhois(ipinfo_result, ipwhois_result)
e = discord.Embed(
title=_(
"Information for ``{ip} ({ip_address})``", ctx, self.bot.config
).format(ip=ip, ip_address=ip_address),
color=0x5858D7,
)
e.add_field(
name=_("Belongs to:", ctx, self.bot.config),
value=merged_results["belongs"],
inline=True,
)
e.add_field(
name="RIR :",
value=merged_results["rir"],
inline=True,
)
e.add_field(
name=_("Region:", ctx, self.bot.config),
value=merged_results["region"],
inline=False,
)
e.set_thumbnail(url=merged_results["flag"])
e.set_footer(
text=_("Hostname: {hostname}", ctx, self.bot.config).format(
hostname=ip_hostname
),
)
kwargs: dict = {}
# noinspection PyUnresolvedReferences
if (
params is not None
and params["map"]
and ( # type: ignore
map_bytes := await get_map_bytes(
self.__config.geoapifyKey, merged_results["map"]
)
)
):
file = discord.File(map_bytes, "map.png")
e.set_image(url="attachment://map.png")
kwargs["file"] = file
kwargs["embed"] = e
return await ctx.send(f"https://ipinfo.io/{ip_address}#", **kwargs)
@command_extra(
name="cloudflare", aliases=["cf", "crimeflare"], deletable=True
)
async def _cloudflare(
self,
ctx: ContextPlus,
ip: DomainConverter,
):
crimeflare_result = await get_crimeflare_result(str(ip))
if crimeflare_result:
alt_ctx = await copy_context_with(
ctx, content=f"{ctx.prefix}iplocalise {crimeflare_result}"
)
return await alt_ctx.command.reinvoke(alt_ctx)
await ctx.send(
_(
"Unable to collect information through CloudFlare",
ctx,
self.bot.config,
)
)
@command_extra(name="getheaders", aliases=["headers"], deletable=True)
async def _getheaders(
self, ctx: ContextPlus, ip: DomainConverter, *, user_agent: str = ""
):
try:
headers = {"User-Agent": user_agent}
colors = {
"1": 0x17A2B8,
"2": 0x28A745,
"3": 0xFFC107,
"4": 0xDC3545,
"5": 0x343A40,
}
async with aiohttp.ClientSession() as cs:
async with cs.get(
str(ip),
headers=headers,
timeout=aiohttp.ClientTimeout(total=8),
) as s:
e = discord.Embed(
title=f"Headers : {ip}",
color=colors.get(str(s.status)[0], 0x6C757D),
)
e.add_field(
name="Status", value=f"```{s.status}```", inline=True
)
e.set_thumbnail(url=f"https://http.cat/{s.status}")
headers = dict(s.headers.items())
headers.pop("Set-Cookie", headers)
fail = False
for key, value in headers.items():
fail, output = await shorten(value, 50, fail)
if output["link"]:
value = _(
"[show all]({})", ctx, self.bot.config
).format(output["link"])
else:
value = f"```\n{output['text']}```"
e.add_field(name=key, value=value, inline=True)
await ctx.send(embed=e)
except (
ClientConnectorError,
InvalidURL,
asyncio.exceptions.TimeoutError,
):
await ctx.send(
_("Cannot connect to host {}", ctx, self.bot.config).format(ip)
)
@command_extra(name="dig", deletable=True)
async def _dig(
self,
ctx: ContextPlus,
domain: IPConverter,
query_type: QueryTypeConverter,
dnssec: str | bool = False,
):
check_query_type_or_raise(str(query_type))
pydig_result = await get_pydig_result(
self.bot.loop, str(domain), str(query_type), dnssec
)
e = discord.Embed(title=f"DIG {domain} {query_type}", color=0x5858D7)
for i, value in enumerate(pydig_result):
e.add_field(name=f"#{i}", value=f"```{value}```")
if not pydig_result:
e.add_field(
name=f"DIG {domain} IN {query_type}",
value=_("No result...", ctx, self.bot.config),
)
await ctx.send(embed=e)
@command_extra(name="ping", deletable=True)
async def _ping(self, ctx: ContextPlus):
start = time.perf_counter()
await ctx.trigger_typing()
end = time.perf_counter()
latency = round(self.bot.latency * 1000, 2)
typing = round((end - start) * 1000, 2)
e = discord.Embed(title="Ping", color=discord.Color.teal())
e.add_field(name="Websocket", value=f"{latency}ms")
e.add_field(name="Typing", value=f"{typing}ms")
await ctx.send(embed=e)
@command_extra(name="isdown", aliases=["is_down", "down?"], deletable=True)
async def _isdown(self, ctx: ContextPlus, domain: IPConverter):
try:
url = f"https://www.isthissitedown.org/site/{domain}"
async with aiohttp.ClientSession() as cs:
async with cs.get(
url,
timeout=aiohttp.ClientTimeout(total=8),
) as s:
text = await s.text()
if "is up!" in text:
title = _("Up!", ctx, self.bot.config)
color = 0x28A745
else:
title = _("Down...", ctx, self.bot.config)
color = 0xDC3545
e = discord.Embed(title=title, color=color)
await ctx.send(url, embed=e)
except (
ClientConnectorError,
InvalidURL,
asyncio.exceptions.TimeoutError,
):
await ctx.send(
_("Cannot connect to host {}", ctx, self.bot.config).format(
domain
)
)
@command_extra(
name="peeringdb", aliases=["peer", "peering"], deletable=True
)
async def _peeringdb(self, ctx: ContextPlus, asn: ASConverter):
check_asn_or_raise(str(asn))
data = {}
if self._peeringdb_net is None:
return await ctx.send(
_(
"Please retry in few minutes",
ctx,
self.bot.config,
).format(asn=asn)
)
for _data in self._peeringdb_net["data"]:
if _data.get("asn", None) == int(str(asn)):
data = _data
break
if not data:
return await ctx.send(
_(
"AS{asn} could not be found in PeeringDB's database.",
ctx,
self.bot.config,
).format(asn=asn)
)
filtered = {
"info_type": "Type",
"info_traffic": "Traffic",
"info_ratio": "Ratio",
"info_prefixes4": "Prefixes IPv4",
"info_prefixes6": "Prefixes IPv6",
}
filtered_link = {
"website": ("Site", "website"),
"looking_glass": ("Looking Glass", "looking_glass"),
"policy_general": ("Peering", "policy_url"),
}
e = discord.Embed(
title=f"{data['name']} ({str_if_empty(data['aka'], f'AS{asn}')})",
color=0x5858D7,
)
for key, name in filtered.items():
e.add_field(
name=name, value=f"```{str_if_empty(data.get(key), 'N/A')}```"
)
for key, names in filtered_link.items():
if data.get(key):
e.add_field(
name=names[0],
value=f"[{str_if_empty(data.get(key), 'N/A')}]"
f"({str_if_empty(data.get(names[1]), 'N/A')})",
)
if data["notes"]:
output = (await shorten(data["notes"], 550))[1]
e.description = output["text"]
if data["created"]:
e.timestamp = datetime.strptime(
data["created"], "%Y-%m-%dT%H:%M:%SZ"
)
await ctx.send(f"https://www.peeringdb.com/net/{data['id']}", embed=e)