tuxbot-bot/tuxbot/cogs/Network/functions/utils.py

197 lines
5.2 KiB
Python

import socket
from typing import NoReturn, Optional
import asyncio
import re
import ipinfo
import ipwhois
import pydig
import aiohttp
from ipinfo.exceptions import RequestQuotaExceededError
from ipwhois import Net
from ipwhois.asn import IPASN
from tuxbot.cogs.Network.functions.exceptions import (
VersionNotFound,
RFC18,
InvalidIp,
InvalidQueryType,
)
def _(x):
return x
def get_ip(ip: str, inet: str = "") -> str:
_inet: socket.AddressFamily | int = 0 # pylint: disable=no-member
if inet == "6":
_inet = socket.AF_INET6
elif inet == "4":
_inet = socket.AF_INET
try:
return socket.getaddrinfo(str(ip), None, _inet)[1][4][0]
except socket.gaierror as e:
raise VersionNotFound(
_(
"Unable to collect information on this in the given "
"version",
)
) from e
async def get_hostname(loop, ip: str) -> str:
def _get_hostname(_ip: str):
try:
return socket.gethostbyaddr(ip)[0]
except socket.herror:
return "N/A"
try:
return await asyncio.wait_for(
loop.run_in_executor(None, _get_hostname, str(ip)),
timeout=0.200,
)
# assuming that if the hostname isn't retrieved in first .3sec,
# it doesn't exists
except asyncio.exceptions.TimeoutError:
return "N/A"
async def get_ipwhois_result(loop, ip_address: str) -> NoReturn | dict:
def _get_ipwhois_result(_ip_address: str) -> NoReturn | dict:
try:
net = Net(ip_address)
obj = IPASN(net)
return obj.lookup()
except ipwhois.exceptions.ASNRegistryError:
return {}
except ipwhois.exceptions.IPDefinedError as e:
raise RFC18(
_(
"IP address {ip_address} is already defined as Private-Use"
" Networks via RFC 1918."
)
) from e
try:
return await asyncio.wait_for(
loop.run_in_executor(None, _get_ipwhois_result, str(ip_address)),
timeout=0.200,
)
except asyncio.exceptions.TimeoutError:
return {}
async def get_ipinfo_result(apikey: str, ip_address: str) -> dict:
try:
handler = ipinfo.getHandlerAsync(
apikey, request_options={"timeout": 7}
)
return (await handler.getDetails(ip_address)).all
except RequestQuotaExceededError:
return {}
async def get_crimeflare_result(
session: aiohttp.ClientSession, ip_address: str
) -> Optional[str]:
try:
async with session.post(
"http://www.crimeflare.org:82/cgi-bin/cfsearch.cgi",
data=f"cfS={ip_address}",
timeout=aiohttp.ClientTimeout(total=15),
) as s:
ip = re.search(r"(\d*\.\d*\.\d*\.\d*)", await s.text())
if ip:
return ip.group()
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError):
pass
return None
def merge_ipinfo_ipwhois(ipinfo_result: dict, ipwhois_result: dict) -> dict:
output = {"belongs": "N/A", "rir": "N/A", "region": "N/A", "flag": "N/A"}
if ipinfo_result:
org = ipinfo_result.get("org", "N/A")
asn = org.split()[0] if len(org.split()) > 1 else "N/A"
output["belongs"] = f"[{org}](https://bgp.he.net/{asn})"
output["rir"] = f"```{ipwhois_result.get('asn_registry', 'N/A')}```"
output["region"] = (
f"```{ipinfo_result.get('city', 'N/A')} - "
f"{ipinfo_result.get('region', 'N/A')} "
f"({ipinfo_result.get('country', 'N/A')})```"
)
output["flag"] = (
f"https://www.countryflags.io/{ipinfo_result['country']}"
f"/shiny/64.png"
)
elif ipwhois_result:
org = ipwhois_result.get("asn_description", "N/A")
asn = ipwhois_result.get("asn", "N/A")
asn_country = ipwhois_result.get("asn_country_code", "N/A")
output["belongs"] = f"{org} ([AS{asn}](https://bgp.he.net/{asn}))"
output["rir"] = f"```{ipwhois_result['asn_registry']}```"
output["region"] = f"```{asn_country}```"
output[
"flag"
] = f"https://www.countryflags.io/{asn_country}/shiny/64.png"
return output
async def get_pydig_result(
domain: str, query_type: str, dnssec: str | bool
) -> list:
additional_args = [] if dnssec is False else ["+dnssec"]
resolver = pydig.Resolver(
nameservers=[
"80.67.169.40",
"80.67.169.12",
],
additional_args=additional_args,
)
return resolver.query(domain, query_type)
def check_ip_version_or_raise(version: str) -> bool | NoReturn:
if version in ("4", "6", "None"):
return True
raise InvalidIp(_("Invalid ip version"))
def check_query_type_or_raise(query_type: str) -> bool | NoReturn:
query_types = (
"a",
"aaaa",
"cname",
"ns",
"ds",
"dnskey",
"soa",
"txt",
"ptr",
"mx",
)
if query_type in query_types:
return True
raise InvalidQueryType(
_(
"Supported queries : A, AAAA, CNAME, NS, DS, DNSKEY, SOA, TXT, PTR, MX"
)
)