-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathip_searcher.py
More file actions
177 lines (140 loc) · 6.63 KB
/
ip_searcher.py
File metadata and controls
177 lines (140 loc) · 6.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from __future__ import annotations
import itertools
import socket
from pathlib import Path
from typing import Dict, List, Optional, Union
import poptrie
class PoptrieError(Exception):
"""Base exception for Poptrie operations."""
pass
class IpSearcher:
"""
Industrial-grade IP searcher using Poptrie (Rust-backed).
Supports fast lookup for IPv4/IPv6 and mapping to ISO country codes.
This class is thread-safe as the underlying Rust implementation handles concurrent access to the mmap.
"""
def __init__(self, bin_path: Union[str, Path]) -> None:
"""
Initialize the searcher with a pre-built binary database.
Args:
bin_path: Path to the .dat/.bin file generated by BinBuilder.
"""
self.path = Path(bin_path)
if not self.path.exists():
raise FileNotFoundError(f"Binary file not found: {self.path}")
try:
self._searcher = poptrie.IpSearcher(str(self.path))
except Exception as e:
raise PoptrieError(f"Failed to load Poptrie database: {e}")
# Lazy-loaded metadata
self._country_map: Optional[Dict[int, str]] = None
self._cn_code = self._country_code_to_u16("CN")
@property
def country_map(self) -> Dict[int, str]:
"""Lazy-loaded mapping of u16 codes to ISO country strings."""
if self._country_map is None:
self._country_map = self._build_country_map()
return self._country_map
@staticmethod
def _build_country_map() -> Dict[int, str]:
"""Builds a map for all possible 2-letter ISO country codes."""
mapping: Dict[int, str] = {}
# Pre-fill with A-Z combinations (676 entries)
for first, second in itertools.product(range(65, 91), range(65, 91)):
code = (first << 8) | second
mapping[code] = f"{chr(first)}{chr(second)}"
return mapping
@staticmethod
def _country_code_to_u16(country_code: str) -> int:
"""Helper to convert 'CN' -> 0x434E."""
if len(country_code) != 2:
return 0
c = country_code.upper()
return (ord(c[0]) << 8) | ord(c[1])
def _get_country_str(self, code: int) -> Optional[str]:
"""Converts u16 code to string, returns None for 0 (no match)."""
return self.country_map.get(code) if code > 0 else None
@staticmethod
def _pack_ip(ip: str) -> bytes:
"""Converts IP string to packed bytes efficiently."""
try:
return socket.inet_pton(socket.AF_INET, ip)
except OSError:
try:
return socket.inet_pton(socket.AF_INET6, ip)
except OSError:
raise ValueError(f"Invalid IP address format: {ip}")
# --- Single Lookup API ---
def __contains__(self, ip: str) -> bool:
"""Usage: '1.2.3.4' in searcher"""
try:
return self.lookup(ip)
except Exception:
return False
def lookup(self, ip: str) -> bool:
"""Returns True if the IP exists in the database."""
return self._searcher.contains_ip(self._pack_ip(ip))
def get_country(self, ip: str) -> Optional[str]:
"""Returns the ISO country code (e.g., 'CN') or None if not found."""
code = self._searcher.lookup_code(self._pack_ip(ip))
return self._get_country_str(code)
def is_china(self, ip: str) -> bool:
"""Shortcut for checking if an IP is in China."""
return self._searcher.lookup_code(self._pack_ip(ip)) == self._cn_code
# --- Batch Lookup API (String Based) ---
def batch_lookup(self, ips: List[str]) -> List[bool]:
"""Batch check multiple IP strings. Parallelized via Rust Rayon."""
return self._searcher.contains_strings(ips)
def batch_get_countries(self, ips: List[str]) -> List[Optional[str]]:
"""Batch lookup countries for multiple IP strings."""
codes = self._searcher.lookup_codes_strings(ips)
return [self._get_country_str(c) for c in codes]
# --- Expert API (Packed Bytes / Stream Processing) ---
def lookup_fast(self, packed_ips: bytes, is_v6: bool = False) -> List[bool]:
"""Highest performance lookup using raw bytes (4-byte for v4, 16-byte for v6)."""
return self._searcher.contains_packed(packed_ips, is_v6)
def get_countries_fast(self, packed_ips: bytes, is_v6: bool = False) -> List[Optional[str]]:
"""Highest performance country lookup using raw bytes."""
codes = self._searcher.lookup_codes_packed(packed_ips, is_v6)
return [self._get_country_str(c) for c in codes]
if __name__ == "__main__":
# Use default path or take from CLI
db_path = "bgp-geoip.dat"
if not Path(db_path).exists():
print(f"Error: Database file '{db_path}' not found.")
print("Please build it first using 'python build_bin.py'.")
exit(1)
searcher = IpSearcher(db_path)
print("=== Poptrie IpSearcher Industrial API Demo ===")
# 1. Single IP Lookups
test_ips = ["1.0.1.1", "8.8.8.8", "240e::1"]
print("\n[1] Single IP Lookups:")
for ip in test_ips:
exists = ip in searcher
country = searcher.get_country(ip)
is_cn = searcher.is_china(ip)
print(f" - {ip:15} | Exists: {str(exists):5} | Country: {str(country):4} | Is China: {is_cn}")
# 2. Batch String Lookups (Parallelized)
print("\n[2] Batch String Lookups (Rayon Parallel):")
batch_ips = ["1.0.1.2", "8.8.4.4", "2001:4860:4860::8888", "114.114.114.114"]
results = searcher.batch_lookup(batch_ips)
countries = searcher.batch_get_countries(batch_ips)
for ip, res, country in zip(batch_ips, results, countries):
print(f" - {ip:25} | Found: {str(res):5} | Country: {country}")
# 3. Expert Mode: Packed Bytes (IPv4)
print("\n[3] Expert Mode: Packed Bytes (IPv4):")
v4_list = ["1.0.1.5", "110.16.0.1"]
packed_v4 = b"".join(socket.inet_pton(socket.AF_INET, ip) for ip in v4_list)
v4_results = searcher.lookup_fast(packed_v4, is_v6=False)
v4_countries = searcher.get_countries_fast(packed_v4, is_v6=False)
for i, ip in enumerate(v4_list):
print(f" - {ip:15} | Found: {str(v4_results[i]):5} | Country: {v4_countries[i]}")
# 4. Expert Mode: Packed Bytes (IPv6)
print("\n[4] Expert Mode: Packed Bytes (IPv6):")
v6_list = ["240e:c000::", "2001:da8::1"]
packed_v6 = b"".join(socket.inet_pton(socket.AF_INET6, ip) for ip in v6_list)
v6_results = searcher.lookup_fast(packed_v6, is_v6=True)
v6_countries = searcher.get_countries_fast(packed_v6, is_v6=True)
for i, ip in enumerate(v6_list):
print(f" - {ip:25} | Found: {str(v6_results[i]):5} | Country: {v6_countries[i]}")
print("\nDemo Complete.")