1+ import aiohttp
2+ import ipaddress
3+ import socket
4+ import sys
5+
6+ from contextlib import closing , suppress
17from core import Port , PortType
2- from typing import Iterable
8+ from typing import Iterable , TYPE_CHECKING
9+
10+ if TYPE_CHECKING :
11+ from core import Node
12+
13+ __all__ = [
14+ "is_open" ,
15+ "get_public_ip" ,
16+ "is_upnp_available" ,
17+ "generate_firewall_rules"
18+ ]
19+
20+ API_URLS = [
21+ 'https://api4.ipify.org/' ,
22+ 'https://ipinfo.io/ip' ,
23+ 'https://www.trackip.net/ip' ,
24+ 'https://api4.my-ip.io/ip' # they have an issue with their cert atm, hope they get it fixed
25+ ]
26+
27+
28+ def is_open (ip , port ):
29+ with closing (socket .socket (socket .AF_INET , socket .SOCK_STREAM )) as s :
30+ s .settimeout (1.0 )
31+ return s .connect_ex ((ip , int (port ))) == 0
32+
33+
34+ async def get_public_ip (node : "Node | None" = None ):
35+ for url in API_URLS :
36+ with suppress (aiohttp .ClientError , ValueError ):
37+ async with aiohttp .ClientSession () as session :
38+ async with session .get (url , proxy = node .proxy if node else None ,
39+ proxy_auth = node .proxy_auth if node else None ) as resp :
40+ return ipaddress .ip_address (await resp .text ()).compressed
41+ else :
42+ raise TimeoutError ("Public IP could not be retrieved." )
343
4- __all__ = ["generate_firewall_rules" ]
44+
45+ if sys .version_info >= (3 , 14 ):
46+
47+ def is_upnp_available () -> bool :
48+ from upnpy import UPnP
49+
50+ try :
51+ upnp = UPnP ()
52+ devices = upnp .discover ()
53+ if not devices :
54+ return False
55+
56+ # Look for an InternetGatewayDevice and its WANIPConnection (or WANPPPConnection) service
57+ for device in devices :
58+ if "InternetGatewayDevice" in (device .device_type or "" ):
59+ try :
60+ # Try WANIPConnection first
61+ wan_services = device .get_services ()
62+ has_wan = any (
63+ ("WANIPConnection" in s .service_type ) or ("WANPPPConnection" in s .service_type )
64+ for s in wan_services
65+ )
66+ if has_wan :
67+ return True
68+ except Exception :
69+ continue
70+ return False
71+ except Exception :
72+ return False
73+
74+ else :
75+
76+ def is_upnp_available () -> bool :
77+ import miniupnpc
78+
79+ try :
80+ upnp = miniupnpc .UPnP ()
81+ devices = upnp .discover () # Discover UPnP-enabled devices
82+ if devices > 0 :
83+ if upnp .selectigd ():
84+ # UPnP is enabled and an IGD was found.
85+ return True
86+ else :
87+ # UPnP is enabled, but no Internet Gateway Device (IGD) is selected
88+ return False
89+ else :
90+ # No UPnP devices detected on the network.
91+ return False
92+ except Exception :
93+ # A UPnP device was found, but no IGD was found.
94+ return False
595
696
797def fw_rule (port : int , protocol : str , name : str , description : str ) -> str :
@@ -20,6 +110,7 @@ def fw_rule(port: int, protocol: str, name: str, description: str) -> str:
20110 )
21111 return cmd
22112
113+
23114def generate_firewall_rules (ports : Iterable [Port ]) -> str :
24115 """
25116 Write a PowerShell script that adds inbound rules for the given ports.
@@ -37,7 +128,7 @@ def generate_firewall_rules(ports: Iterable[Port]) -> str:
37128 for p in ports :
38129 if p .typ is PortType .BOTH :
39130 # Create two separate rules
40- for proto in ( PortType .TCP , PortType .UDP ) :
131+ for proto in [ PortType .TCP , PortType .UDP ] :
41132 name = f"Allow { p .port } /{ proto .value .lower ()} "
42133 desc = f"Auto‑generated rule for inbound { p .port } /{ proto .value .lower ()} "
43134 lines .append (fw_rule (p .port , proto .value , name , desc ))
0 commit comments