11#!/usr/bin/env python3
2- # -*- coding: utf-8 -*-
2+
3+ from __future__ import annotations
34
45import json
56import requests
67
7- from typing import Dict , Any , Optional
8+ from importlib .metadata import version
9+ from typing import Dict , Any , Optional , List
810from urllib .parse import urljoin , urlparse
911
1012import ipaddress
1113
14+ from urllib3 .util import Retry
15+ from requests .adapters import HTTPAdapter
16+
1217
1318class IPASNHistory ():
1419
15- def __init__ (self , root_url : str = 'https://ipasnhistory.circl.lu/' ) :
20+ def __init__ (self , root_url : str = 'https://ipasnhistory.circl.lu/' , useragent : str | None = None ) -> None :
1621 self .root_url = root_url
1722 if not urlparse (self .root_url ).scheme :
1823 self .root_url = 'http://' + self .root_url
1924 if not self .root_url .endswith ('/' ):
2025 self .root_url += '/'
2126 self .session = requests .session ()
27+ self .session .headers ['user-agent' ] = useragent if useragent else f'PyIPASNHIstory / { version ("pyipasnhistory" )} '
28+ retries = Retry (total = 5 , backoff_factor = 0.1 , status_forcelist = [500 , 502 , 503 , 504 ])
29+ self .session .mount ('http://' , HTTPAdapter (max_retries = retries ))
2230
2331 @property
24- def is_up (self ):
25- r = self .session .head (self .root_url )
32+ def is_up (self ) -> bool :
33+ '''Test if the given instance is accessible'''
34+ try :
35+ r = self .session .head (self .root_url )
36+ except requests .exceptions .ConnectionError :
37+ return False
2638 return r .status_code == 200
2739
2840 def meta (self ):
2941 '''Get meta information from the remote instance'''
3042 r = requests .get (urljoin (self .root_url , 'meta' ))
3143 return r .json ()
3244
33- def mass_cache (self , list_to_cache : list ):
45+ def mass_cache (self , list_to_cache : list [ dict [ str , Any ]] ):
3446 '''Cache a list of IP queries. The next call on the same IPs will be very quick.'''
3547 to_query = []
3648 for entry in list_to_cache :
3749 if 'precision_delta' in entry :
3850 entry ['precision_delta' ] = json .dumps (entry .pop ('precision_delta' ))
3951 to_query .append (entry )
4052
41- r = self .session .post (urljoin (self .root_url , 'mass_cache' ), data = json . dumps ( to_query ) )
53+ r = self .session .post (urljoin (self .root_url , 'mass_cache' ), json = to_query )
4254 return r .json ()
4355
44- def mass_query (self , list_to_query : list ) :
56+ def mass_query (self , list_to_query : list [ dict [ str , Any ]]) -> dict [ str , Any ] :
4557 '''Query a list of IPs.'''
4658 to_query = []
4759 for entry in list_to_query :
4860 if 'precision_delta' in entry :
4961 entry ['precision_delta' ] = json .dumps (entry .pop ('precision_delta' ))
5062 to_query .append (entry )
51- r = self .session .post (urljoin (self .root_url , 'mass_query' ), data = json . dumps ( to_query ) )
63+ r = self .session .post (urljoin (self .root_url , 'mass_query' ), json = to_query )
5264 return r .json ()
5365
54- def asn_meta (self , asn : Optional [ int ] = None , source : Optional [ str ] = None , address_family : str = 'v4' ,
55- date : Optional [ str ] = None , first : Optional [ str ] = None , last : Optional [ str ] = None ,
56- precision_delta : Optional [ Dict ] = None ):
66+ def asn_meta (self , asn : int | None = None , source : str | None = None , address_family : str = 'v4' ,
67+ date : str | None = None , first : str | None = None , last : str | None = None ,
68+ precision_delta : dict | None = None ):
5769 '''Get all the prefixes annonced by an AS'''
58- to_query : Dict [str , Any ] = {'address_family' : address_family }
70+ to_query : dict [str , Any ] = {'address_family' : address_family }
5971 if source :
6072 to_query ['source' ] = source
6173 if asn :
@@ -69,10 +81,10 @@ def asn_meta(self, asn: Optional[int]=None, source: Optional[str]=None, address_
6981 if precision_delta :
7082 to_query ['precision_delta' ] = json .dumps (precision_delta )
7183
72- r = self .session .post (urljoin (self .root_url , 'asn_meta' ), data = json . dumps ( to_query ) )
84+ r = self .session .post (urljoin (self .root_url , 'asn_meta' ), json = to_query )
7385 return r .json ()
7486
75- def _aggregate_details (self , details : dict ):
87+ def _aggregate_details (self , details : dict ) -> list :
7688 '''Aggregare the response when the asn/prefix tuple is the same over a period of time.'''
7789 to_return = []
7890 current = None
@@ -91,9 +103,9 @@ def _aggregate_details(self, details: dict):
91103 to_return .append (current )
92104 return to_return
93105
94- def query (self , ip : str , source : Optional [ str ] = None , address_family : Optional [ str ] = None ,
95- date : Optional [ str ] = None , first : Optional [ str ] = None , last : Optional [ str ] = None ,
96- precision_delta : Optional [ Dict ] = None , aggregate : bool = False ):
106+ def query (self , ip : str , source : str | None = None , address_family : str | None = None ,
107+ date : str | None = None , first : str | None = None , last : str | None = None ,
108+ precision_delta : dict | None = None , aggregate : bool = False ):
97109 '''Launch a query.
98110
99111 :param ip: IP to lookup
0 commit comments