diff --git a/.idea/dictionaries/romain.xml b/.idea/dictionaries/romain.xml index 4d91983..711748e 100644 --- a/.idea/dictionaries/romain.xml +++ b/.idea/dictionaries/romain.xml @@ -25,6 +25,7 @@ ipwhois jishaku langue + latlon levelname liste localiseip diff --git a/tuxbot/cogs/Network/config.py b/tuxbot/cogs/Network/config.py index 114e890..cb06577 100644 --- a/tuxbot/cogs/Network/config.py +++ b/tuxbot/cogs/Network/config.py @@ -7,6 +7,7 @@ HAS_MODELS = False class NetworkConfig(Structure): ipinfoKey: str = StrField("") + geoapifyKey: str = StrField("") extra: Dict[str, Dict] = { @@ -14,4 +15,8 @@ extra: Dict[str, Dict] = { "type": str, "description": "API Key for ipinfo.io (.iplocalise command)", }, + "geoapifyKey": { + "type": str, + "description": "API Key for geoapify.com (.iplocalise command)", + }, } diff --git a/tuxbot/cogs/Network/functions/converters.py b/tuxbot/cogs/Network/functions/converters.py index fdd5857..3cb8cf8 100644 --- a/tuxbot/cogs/Network/functions/converters.py +++ b/tuxbot/cogs/Network/functions/converters.py @@ -30,12 +30,24 @@ class QueryTypeConverter(commands.Converter): return argument.lower() -class IPVersionConverter(commands.Converter): +class IPParamsConverter(commands.Converter): async def convert(self, ctx: Context, argument: str): # skipcq: PYL-W0613 if not argument: - return argument + return None - return argument.replace("-", "").replace("ip", "").replace("v", "") + params = { + "inet": "", + "map": "map" in argument.lower(), + } + + if "4" in argument: + params["inet"] = "4" + elif "6" in argument: + params["inet"] = "6" + elif len(arg := argument.split(" ")) >= 2: + params["inet"] = arg[0] + + return params class ASConverter(commands.Converter): diff --git a/tuxbot/cogs/Network/functions/utils.py b/tuxbot/cogs/Network/functions/utils.py index bb21667..321697e 100644 --- a/tuxbot/cogs/Network/functions/utils.py +++ b/tuxbot/cogs/Network/functions/utils.py @@ -1,3 +1,4 @@ +import io import socket from typing import NoReturn, Optional @@ -35,13 +36,14 @@ def _(x): cache=Cache.MEMORY, namespace="network", ) -async def get_ip(loop, ip: str, inet: str = "") -> str: +async def get_ip(loop, ip: str, inet: Optional[dict]) -> str: _inet: socket.AddressFamily | int = 0 # pylint: disable=no-member - if inet == "6": - _inet = socket.AF_INET6 - elif inet == "4": - _inet = socket.AF_INET + if inet: + if inet["inet"] == "6": + _inet = socket.AF_INET6 + elif inet["inet"] == "4": + _inet = socket.AF_INET def _get_ip(_ip: str): try: @@ -153,7 +155,13 @@ async def get_crimeflare_result(ip: str) -> Optional[str]: def merge_ipinfo_ipwhois(ipinfo_result: dict, ipwhois_result: dict) -> dict: - output = {"belongs": "N/A", "rir": "N/A", "region": "N/A", "flag": "N/A"} + output = { + "belongs": "N/A", + "rir": "N/A", + "region": "N/A", + "flag": "N/A", + "map": "N/A", + } if ipinfo_result: org = ipinfo_result.get("org", "N/A") @@ -166,10 +174,10 @@ def merge_ipinfo_ipwhois(ipinfo_result: dict, ipwhois_result: dict) -> dict: f"{ipinfo_result.get('region', 'N/A')} " f"({ipinfo_result.get('country', 'N/A')})```" ) - output["flag"] = ( - f"https://www.countryflags.io/{ipinfo_result['country']}" - f"/shiny/64.png" - ) + output[ + "flag" + ] = f"https://flagcdn.com/144x108/{ipinfo_result['country'].lower()}.png" + output["map"] = ipinfo_result["loc"] elif ipwhois_result: org = ipwhois_result.get("asn_description", "N/A") asn = ipwhois_result.get("asn", "N/A") @@ -180,11 +188,45 @@ def merge_ipinfo_ipwhois(ipinfo_result: dict, ipwhois_result: dict) -> dict: output["region"] = f"```{asn_country}```" output[ "flag" - ] = f"https://www.countryflags.io/{asn_country}/shiny/64.png" + ] = f"https://flagcdn.com/144x108/{asn_country.lower()}.png" return output +@cached( + ttl=24 * 3600, + serializer=PickleSerializer(), + cache=Cache.MEMORY, + namespace="network", +) +async def get_map_bytes(apikey: str, latlon: str) -> Optional[io.BytesIO]: + if latlon == "N/A": + return None + + url = ( + "https://maps.geoapify.com/v1/staticmap" + "?style=osm-carto" + "&width=400" + "&height=300" + "¢er=lonlat:{lonlat}" + "&zoom=12" + "&marker=lonlat:{lonlat};color:%23ff0000;size:small" + "&pitch=42" + "&apiKey={apikey}" + ) + + lonlat = ",".join(latlon.split(",")[::-1]) + + url = url.format(lonlat=lonlat, apikey=apikey) + + async with aiohttp.ClientSession() as cs: + async with cs.get(url) as s: + if s.status != 200: + return None + + return io.BytesIO(await s.read()) + + @cached( ttl=24 * 3600, serializer=PickleSerializer(), @@ -257,8 +299,8 @@ async def get_peeringdb_net_result(asn: str) -> dict: return {"data": []} -def check_ip_version_or_raise(version: str) -> bool | NoReturn: - if version in ("4", "6", "None"): +def check_ip_version_or_raise(version: Optional[dict]) -> bool | NoReturn: + if version is None or version["inet"] in ("4", "6", ""): return True raise InvalidIp(_("Invalid ip version")) diff --git a/tuxbot/cogs/Network/network.py b/tuxbot/cogs/Network/network.py index 55dcab1..7255223 100644 --- a/tuxbot/cogs/Network/network.py +++ b/tuxbot/cogs/Network/network.py @@ -13,7 +13,7 @@ from ipinfo.exceptions import RequestQuotaExceededError from structured_config import ConfigFile from tuxbot.cogs.Network.functions.converters import ( IPConverter, - IPVersionConverter, + IPParamsConverter, DomainConverter, QueryTypeConverter, ASConverter, @@ -43,6 +43,7 @@ from .functions.utils import ( get_crimeflare_result, get_ipinfo_result, get_ipwhois_result, + get_map_bytes, get_pydig_result, get_peeringdb_net_result, merge_ipinfo_ipwhois, @@ -89,7 +90,7 @@ class Network(commands.Cog): async def _update_peering_db(self): await get_peeringdb_net_result(str(1)) - logging.log(logging.INFO, "_update_peering_db") + log.log(logging.INFO, "_update_peering_db") self.bot.console.log("[Network]: _update_peering_db") # ========================================================================= @@ -100,11 +101,16 @@ class Network(commands.Cog): self, ctx: ContextPlus, ip: IPConverter, - version: Optional[IPVersionConverter] = None, + *, + params: Optional[IPParamsConverter] = None, ): - check_ip_version_or_raise(str(version)) + # noinspection PyUnresolvedReferences + check_ip_version_or_raise(params) # type: ignore - ip_address = await get_ip(self.bot.loop, str(ip), str(version)) + # noinspection PyUnresolvedReferences + ip_address = await get_ip( + self.bot.loop, str(ip), params # type: ignore + ) ip_hostname = await get_hostname(self.bot.loop, str(ip_address)) @@ -146,9 +152,30 @@ class Network(commands.Cog): ), ) - await ctx.send(embed=e) + kwargs: dict = {} - @command_extra(name="cloudflare", deletable=True) + # noinspection PyUnresolvedReferences + if ( + params is not None + and params["map"] + and ( # type: ignore + map_bytes := await get_map_bytes( + self.__config.geoapifyKey, merged_results["map"] + ) + ) + ): + file = discord.File(map_bytes, "map.png") + e.set_image(url="attachment://map.png") + + kwargs["file"] = file + + kwargs["embed"] = e + + return await ctx.send(f"https://ipinfo.io/{ip_address}", **kwargs) + + @command_extra( + name="cloudflare", aliases=["cf", "crimeflare"], deletable=True + ) async def _cloudflare( self, ctx: ContextPlus,