feat(commands:iplocalise|Network): add way to ask for map

This commit is contained in:
Romain J 2021-04-24 22:34:53 +02:00
parent b9f6c6cb0a
commit 2a00d93023
5 changed files with 110 additions and 23 deletions

View file

@ -25,6 +25,7 @@
<w>ipwhois</w>
<w>jishaku</w>
<w>langue</w>
<w>latlon</w>
<w>levelname</w>
<w>liste</w>
<w>localiseip</w>

View file

@ -7,6 +7,7 @@ HAS_MODELS = False
class NetworkConfig(Structure):
ipinfoKey: str = StrField("")
geoapifyKey: str = StrField("")
extra: Dict[str, Dict] = {
@ -14,4 +15,8 @@ extra: Dict[str, Dict] = {
"type": str,
"description": "API Key for ipinfo.io (.iplocalise command)",
},
"geoapifyKey": {
"type": str,
"description": "API Key for geoapify.com (.iplocalise command)",
},
}

View file

@ -30,12 +30,24 @@ class QueryTypeConverter(commands.Converter):
return argument.lower()
class IPVersionConverter(commands.Converter):
class IPParamsConverter(commands.Converter):
async def convert(self, ctx: Context, argument: str): # skipcq: PYL-W0613
if not argument:
return argument
return None
return argument.replace("-", "").replace("ip", "").replace("v", "")
params = {
"inet": "",
"map": "map" in argument.lower(),
}
if "4" in argument:
params["inet"] = "4"
elif "6" in argument:
params["inet"] = "6"
elif len(arg := argument.split(" ")) >= 2:
params["inet"] = arg[0]
return params
class ASConverter(commands.Converter):

View file

@ -1,3 +1,4 @@
import io
import socket
from typing import NoReturn, Optional
@ -35,12 +36,13 @@ def _(x):
cache=Cache.MEMORY,
namespace="network",
)
async def get_ip(loop, ip: str, inet: str = "") -> str:
async def get_ip(loop, ip: str, inet: Optional[dict]) -> str:
_inet: socket.AddressFamily | int = 0 # pylint: disable=no-member
if inet == "6":
if inet:
if inet["inet"] == "6":
_inet = socket.AF_INET6
elif inet == "4":
elif inet["inet"] == "4":
_inet = socket.AF_INET
def _get_ip(_ip: str):
@ -153,7 +155,13 @@ async def get_crimeflare_result(ip: str) -> Optional[str]:
def merge_ipinfo_ipwhois(ipinfo_result: dict, ipwhois_result: dict) -> dict:
output = {"belongs": "N/A", "rir": "N/A", "region": "N/A", "flag": "N/A"}
output = {
"belongs": "N/A",
"rir": "N/A",
"region": "N/A",
"flag": "N/A",
"map": "N/A",
}
if ipinfo_result:
org = ipinfo_result.get("org", "N/A")
@ -166,10 +174,10 @@ def merge_ipinfo_ipwhois(ipinfo_result: dict, ipwhois_result: dict) -> dict:
f"{ipinfo_result.get('region', 'N/A')} "
f"({ipinfo_result.get('country', 'N/A')})```"
)
output["flag"] = (
f"https://www.countryflags.io/{ipinfo_result['country']}"
f"/shiny/64.png"
)
output[
"flag"
] = f"https://flagcdn.com/144x108/{ipinfo_result['country'].lower()}.png"
output["map"] = ipinfo_result["loc"]
elif ipwhois_result:
org = ipwhois_result.get("asn_description", "N/A")
asn = ipwhois_result.get("asn", "N/A")
@ -180,11 +188,45 @@ def merge_ipinfo_ipwhois(ipinfo_result: dict, ipwhois_result: dict) -> dict:
output["region"] = f"```{asn_country}```"
output[
"flag"
] = f"https://www.countryflags.io/{asn_country}/shiny/64.png"
] = f"https://flagcdn.com/144x108/{asn_country.lower()}.png"
return output
@cached(
ttl=24 * 3600,
serializer=PickleSerializer(),
cache=Cache.MEMORY,
namespace="network",
)
async def get_map_bytes(apikey: str, latlon: str) -> Optional[io.BytesIO]:
if latlon == "N/A":
return None
url = (
"https://maps.geoapify.com/v1/staticmap"
"?style=osm-carto"
"&width=400"
"&height=300"
"&center=lonlat:{lonlat}"
"&zoom=12"
"&marker=lonlat:{lonlat};color:%23ff0000;size:small"
"&pitch=42"
"&apiKey={apikey}"
)
lonlat = ",".join(latlon.split(",")[::-1])
url = url.format(lonlat=lonlat, apikey=apikey)
async with aiohttp.ClientSession() as cs:
async with cs.get(url) as s:
if s.status != 200:
return None
return io.BytesIO(await s.read())
@cached(
ttl=24 * 3600,
serializer=PickleSerializer(),
@ -257,8 +299,8 @@ async def get_peeringdb_net_result(asn: str) -> dict:
return {"data": []}
def check_ip_version_or_raise(version: str) -> bool | NoReturn:
if version in ("4", "6", "None"):
def check_ip_version_or_raise(version: Optional[dict]) -> bool | NoReturn:
if version is None or version["inet"] in ("4", "6", ""):
return True
raise InvalidIp(_("Invalid ip version"))

View file

@ -13,7 +13,7 @@ from ipinfo.exceptions import RequestQuotaExceededError
from structured_config import ConfigFile
from tuxbot.cogs.Network.functions.converters import (
IPConverter,
IPVersionConverter,
IPParamsConverter,
DomainConverter,
QueryTypeConverter,
ASConverter,
@ -43,6 +43,7 @@ from .functions.utils import (
get_crimeflare_result,
get_ipinfo_result,
get_ipwhois_result,
get_map_bytes,
get_pydig_result,
get_peeringdb_net_result,
merge_ipinfo_ipwhois,
@ -89,7 +90,7 @@ class Network(commands.Cog):
async def _update_peering_db(self):
await get_peeringdb_net_result(str(1))
logging.log(logging.INFO, "_update_peering_db")
log.log(logging.INFO, "_update_peering_db")
self.bot.console.log("[Network]: _update_peering_db")
# =========================================================================
@ -100,11 +101,16 @@ class Network(commands.Cog):
self,
ctx: ContextPlus,
ip: IPConverter,
version: Optional[IPVersionConverter] = None,
*,
params: Optional[IPParamsConverter] = None,
):
check_ip_version_or_raise(str(version))
# noinspection PyUnresolvedReferences
check_ip_version_or_raise(params) # type: ignore
ip_address = await get_ip(self.bot.loop, str(ip), str(version))
# noinspection PyUnresolvedReferences
ip_address = await get_ip(
self.bot.loop, str(ip), params # type: ignore
)
ip_hostname = await get_hostname(self.bot.loop, str(ip_address))
@ -146,9 +152,30 @@ class Network(commands.Cog):
),
)
await ctx.send(embed=e)
kwargs: dict = {}
@command_extra(name="cloudflare", deletable=True)
# noinspection PyUnresolvedReferences
if (
params is not None
and params["map"]
and ( # type: ignore
map_bytes := await get_map_bytes(
self.__config.geoapifyKey, merged_results["map"]
)
)
):
file = discord.File(map_bytes, "map.png")
e.set_image(url="attachment://map.png")
kwargs["file"] = file
kwargs["embed"] = e
return await ctx.send(f"https://ipinfo.io/{ip_address}", **kwargs)
@command_extra(
name="cloudflare", aliases=["cf", "crimeflare"], deletable=True
)
async def _cloudflare(
self,
ctx: ContextPlus,