diff --git a/examples/upnp/README.md b/examples/upnp/README.md new file mode 100644 index 000000000..327f61a17 --- /dev/null +++ b/examples/upnp/README.md @@ -0,0 +1,24 @@ +# UPnP Example + +This example demonstrates how to use the integrated UPnP behaviour to automatically create a port mapping on your local network's gateway (e.g., your home router). + +This allows peers from the public internet to directly dial your node, which is a crucial step for NAT traversal. + +## Usage + +First, ensure you have installed the necessary dependencies from the root of the repository: + +```sh +pip install -e ".[upnp]" +``` + +Then, run the script in a terminal: + +```sh +python examples/upnp/upnp_demo.py +``` + +The script will start a libp2p host and immediately try to discover a UPnP-enabled gateway on the network. + +- If it **succeeds**, it will print the new external address that has been mapped. +- If it **fails** (e.g., your router doesn't have UPnP enabled or you're behind a double NAT), it will print a descriptive error message. diff --git a/examples/upnp/upnp_demo.py b/examples/upnp/upnp_demo.py new file mode 100644 index 000000000..5094f18f4 --- /dev/null +++ b/examples/upnp/upnp_demo.py @@ -0,0 +1,45 @@ +import argparse +import logging + +import multiaddr +import trio + +from libp2p import new_host + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("upnp-demo") + + +async def run(port: int) -> None: + listen_maddr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + host = new_host(enable_upnp=True) + + async with host.run(listen_addrs=[listen_maddr]): + try: + logger.info(f"Host started with ID: {host.get_id().pretty()}") + logger.info(f"Listening on: {host.get_addrs()}") + logger.info("Host is running. Press Ctrl+C to shut down.") + logger.info("If UPnP discovery was successful, ports are now mapped.") + await trio.sleep_forever() + except KeyboardInterrupt: + logger.info("Shutting down...") + finally: + # UPnP teardown is automatic via host.run() + logger.info("Shutdown complete.") + + +def main() -> None: + parser = argparse.ArgumentParser(description="UPnP example for py-libp2p") + parser.add_argument( + "-p", "--port", type=int, default=0, help="Local TCP port (0 for random)" + ) + args = parser.parse_args() + + try: + trio.run(run, args.port) + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 542a71c1d..494221a50 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -251,6 +251,7 @@ def new_host( muxer_preference: Literal["YAMUX", "MPLEX"] | None = None, listen_addrs: Sequence[multiaddr.Multiaddr] | None = None, enable_mDNS: bool = False, + enable_upnp: bool = False, negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> IHost: """ @@ -277,6 +278,6 @@ def new_host( if disc_opt is not None: return RoutedHost(swarm, disc_opt, enable_mDNS) - return BasicHost(network=swarm,enable_mDNS=enable_mDNS , negotitate_timeout=negotiate_timeout) + return BasicHost(network=swarm, enable_mDNS=enable_mDNS, negotiate_timeout=negotiate_timeout, enable_upnp=enable_upnp) __version__ = __version("libp2p") diff --git a/libp2p/discovery/upnp.py b/libp2p/discovery/upnp.py new file mode 100644 index 000000000..e80feef2b --- /dev/null +++ b/libp2p/discovery/upnp.py @@ -0,0 +1,139 @@ +import ipaddress +import logging + +try: + import miniupnpc # type: ignore[import-error] +except ImportError: + miniupnpc = None +import trio + +logger = logging.getLogger("libp2p.discovery.upnp") + + +class UpnpManager: + """ + A simple, self-contained manager for UPnP port mapping that can be used + alongside a libp2p Host. + """ + + def __init__(self) -> None: + if miniupnpc is None: + raise RuntimeError( + "UPnP support requires the miniupnpc library; " + "install with `pip install libp2p[upnp]`" + ) + self._gateway = miniupnpc.UPnP() + self._lan_addr: str | None = None + self._external_ip: str | None = None + + async def discover(self) -> bool: + """ + Discover the UPnP IGD on the network. + + :return: True if a gateway is found, False otherwise. + """ + logger.debug("Discovering UPnP gateway...") + try: + try: + num_devices = await trio.to_thread.run_sync(self._gateway.discover) + except Exception as e: + # The miniupnpc library has a known quirk where `discover()` can + # raise an exception with the message "Success" on some platforms + # (e.g., Windows) instead of returning a number of devices. We treat + # this as a successful discovery of 1 device. + if str(e) == "Success": # type: ignore + num_devices = 1 + else: + logger.exception("UPnP discovery exception") + return False + + if num_devices > 0: + await trio.to_thread.run_sync(self._gateway.selectigd) + self._lan_addr = self._gateway.lanaddr + self._external_ip = await trio.to_thread.run_sync( + self._gateway.externalipaddress + ) + logger.debug(f"UPnP gateway found: {self._external_ip}") + + if self._external_ip is None: + logger.error("Gateway did not return an external IP address") + return False + + ip_obj = ipaddress.ip_address(self._external_ip) + if ip_obj.is_private: + logger.warning( + "UPnP gateway has a private IP; you may be behind a double NAT." + ) + return False + return True + else: + logger.debug("No UPnP devices found") + return False + except Exception: + logger.exception("UPnP discovery failed") + return False + + async def add_port_mapping(self, port: int, protocol: str = "TCP") -> bool: + """ + Request a new port mapping from the gateway. + + :param port: the internal port to map + :param protocol: the protocol to map (TCP or UDP) + :return: True on success, False otherwise + """ + if not 0 < port < 65536: + logger.error(f"Invalid port number for mapping: {port}") + return False + if port < 1024: + logger.warning( + f"Mapping a well-known (privileged) port ({port}) may fail or " + "require root." + ) + + if not self._lan_addr: + logger.error( + "Cannot add port mapping: discovery has not been run successfully." + ) + return False + + logger.debug(f"Requesting UPnP mapping for {protocol} port {port}...") + try: + await trio.to_thread.run_sync( + lambda: self._gateway.addportmapping( + port, protocol, self._lan_addr, port, "py-libp2p", "" + ) + ) + logger.info( + f"Successfully mapped external port {self._external_ip}:{port} " + f"to internal port {self._lan_addr}:{port}" + ) + return True + except Exception: + logger.exception(f"Failed to map port {port}") + return False + + async def remove_port_mapping(self, port: int, protocol: str = "TCP") -> bool: + """ + Remove an existing port mapping. + + :param port: the external port to unmap + :param protocol: the protocol (TCP or UDP) + :return: True on success, False otherwise + """ + if not 0 < port < 65536: + logger.error(f"Invalid port number for removal: {port}") + return False + + logger.debug(f"Removing UPnP mapping for {protocol} port {port}...") + try: + await trio.to_thread.run_sync( + lambda: self._gateway.deleteportmapping(port, protocol) + ) + logger.info(f"Successfully removed mapping for port {port}") + return True + except Exception: + logger.exception(f"Failed to remove mapping for port {port}") + return False + + def get_external_ip(self) -> str | None: + return self._external_ip diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index cc93be08b..77fee19f5 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -30,6 +30,7 @@ TProtocol, ) from libp2p.discovery.mdns.mdns import MDNSDiscovery +from libp2p.discovery.upnp import UpnpManager from libp2p.host.defaults import ( get_default_protocols, ) @@ -85,6 +86,9 @@ class BasicHost(IHost): _network: INetworkService peerstore: IPeerStore + mDNS: MDNSDiscovery | None + upnp: UpnpManager | None + multiselect: Multiselect multiselect_client: MultiselectClient @@ -92,20 +96,26 @@ def __init__( self, network: INetworkService, enable_mDNS: bool = False, + enable_upnp: bool = False, default_protocols: Optional["OrderedDict[TProtocol, StreamHandlerFn]"] = None, - negotitate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, + negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, ) -> None: self._network = network self._network.set_stream_handler(self._swarm_stream_handler) self.peerstore = self._network.peerstore - self.negotiate_timeout = negotitate_timeout + self.negotiate_timeout = negotiate_timeout # Protocol muxing default_protocols = default_protocols or get_default_protocols(self) self.multiselect = Multiselect(dict(default_protocols.items())) self.multiselect_client = MultiselectClient() + self.mDNS = None if enable_mDNS: self.mDNS = MDNSDiscovery(network) + self.upnp = None + if enable_upnp: + self.upnp = UpnpManager() + def get_id(self) -> ID: """ :return: peer_id of host @@ -172,11 +182,24 @@ async def _run() -> AsyncIterator[None]: if hasattr(self, "mDNS") and self.mDNS is not None: logger.debug("Starting mDNS Discovery") self.mDNS.start() + if self.upnp: + upnp_manager = self.upnp + logger.debug("Starting UPnP discovery and port mapping") + if await upnp_manager.discover(): + for addr in self.get_addrs(): + if port := addr.value_for_protocol("tcp"): + await upnp_manager.add_port_mapping(port, "TCP") try: yield finally: if hasattr(self, "mDNS") and self.mDNS is not None: self.mDNS.stop() + if self.upnp and self.upnp.get_external_ip(): + upnp_manager = self.upnp + logger.debug("Removing UPnP port mappings") + for addr in self.get_addrs(): + if port := addr.value_for_protocol("tcp"): + await upnp_manager.remove_port_mapping(port, "TCP") return _run() diff --git a/pyproject.toml b/pyproject.toml index 1b9589af2..d7fd5c289 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,6 +93,8 @@ test = [ "factory-boy>=2.12.0,<3.0.0", ] +upnp = ["miniupnpc>=2.0"] + [tool.setuptools] include-package-data = true diff --git a/tests/discovery/test_upnp.py b/tests/discovery/test_upnp.py new file mode 100644 index 000000000..d2349852b --- /dev/null +++ b/tests/discovery/test_upnp.py @@ -0,0 +1,154 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from libp2p.discovery.upnp import UpnpManager + +pytestmark = pytest.mark.trio + + +@pytest.fixture +def mock_upnp_gateway(): + """A pytest fixture to mock the miniupnpc.UPnP object.""" + with patch("libp2p.discovery.upnp.miniupnpc.UPnP") as mock_upnp_class: + mock_gateway = MagicMock() + mock_upnp_class.return_value = mock_gateway + yield mock_gateway + + +async def test_upnp_discover_success(mock_upnp_gateway): + """ + Test successful discovery of a UPnP gateway. + """ + mock_upnp_gateway.discover.return_value = 1 + mock_upnp_gateway.selectigd.return_value = None + mock_upnp_gateway.lanaddr = "192.168.1.100" + mock_upnp_gateway.externalipaddress.return_value = "123.45.67.89" + + manager = UpnpManager() + result = await manager.discover() + + assert result is True + assert manager.get_external_ip() == "123.45.67.89" + assert manager._lan_addr == "192.168.1.100" + mock_upnp_gateway.discover.assert_called_once() + mock_upnp_gateway.selectigd.assert_called_once() + mock_upnp_gateway.externalipaddress.assert_called_once() + + +async def test_upnp_discover_with_success_exception(mock_upnp_gateway): + """ + Test the workaround for the miniupnpc bug where it raises Exception("Success"). + """ + mock_upnp_gateway.discover.side_effect = Exception("Success") + mock_upnp_gateway.selectigd.return_value = None + mock_upnp_gateway.lanaddr = "192.168.1.100" + mock_upnp_gateway.externalipaddress.return_value = "123.45.67.89" + + manager = UpnpManager() + result = await manager.discover() + + assert result is True + assert manager.get_external_ip() == "123.45.67.89" + + +async def test_upnp_discover_no_devices_found(mock_upnp_gateway): + """ + Test UPnP discovery when no devices are found. + """ + mock_upnp_gateway.discover.return_value = 0 + manager = UpnpManager() + result = await manager.discover() + + assert result is False + assert manager.get_external_ip() is None + + +@patch("libp2p.discovery.upnp.logger") +async def test_upnp_discover_double_nat(mock_logger, mock_upnp_gateway): + """ + Test UPnP discovery when the external IP is private (double NAT). + """ + mock_upnp_gateway.discover.return_value = 1 + mock_upnp_gateway.selectigd.return_value = None + mock_upnp_gateway.lanaddr = "192.168.1.100" + mock_upnp_gateway.externalipaddress.return_value = "10.0.0.1" + + manager = UpnpManager() + result = await manager.discover() + + assert result is False + assert manager.get_external_ip() == "10.0.0.1" + mock_logger.warning.assert_called_once_with( + "UPnP gateway has a private IP; you may be behind a double NAT." + ) + + +async def test_add_port_mapping_success(mock_upnp_gateway): + """ + Test successfully adding a port mapping after discovery. + """ + mock_upnp_gateway.discover.return_value = 1 + mock_upnp_gateway.selectigd.return_value = None + mock_upnp_gateway.lanaddr = "192.168.1.100" + mock_upnp_gateway.externalipaddress.return_value = "123.45.67.89" + + manager = UpnpManager() + await manager.discover() + + mock_upnp_gateway.addportmapping.return_value = None + result = await manager.add_port_mapping(port=8080, protocol="TCP") + + assert result is True + mock_upnp_gateway.addportmapping.assert_called_once_with( + 8080, "TCP", "192.168.1.100", 8080, "py-libp2p", "" + ) + + +@patch("libp2p.discovery.upnp.logger") +async def test_add_port_mapping_failure_no_discover(mock_logger, mock_upnp_gateway): + """ + Test that adding a port mapping fails if discover() hasn't been run. + """ + manager = UpnpManager() + result = await manager.add_port_mapping(port=8080, protocol="TCP") + + assert result is False + mock_upnp_gateway.addportmapping.assert_not_called() + mock_logger.error.assert_called_once_with( + "Cannot add port mapping: discovery has not been run successfully." + ) + + +@patch("libp2p.discovery.upnp.logger") +async def test_add_port_mapping_exception(mock_logger, mock_upnp_gateway): + """ + Test adding a port mapping when the gateway raises an exception. + """ + mock_upnp_gateway.discover.return_value = 1 + mock_upnp_gateway.lanaddr = "192.168.1.100" + mock_upnp_gateway.externalipaddress.return_value = "123.45.67.89" + + manager = UpnpManager() + await manager.discover() + + mock_upnp_gateway.addportmapping.side_effect = Exception("Gateway rejected mapping") + result = await manager.add_port_mapping(port=8080, protocol="TCP") + + assert result is False + mock_logger.exception.assert_called_once_with("Failed to map port 8080") + + +async def test_remove_port_mapping_success(mock_upnp_gateway): + """ + Test successfully removing a port mapping. + """ + manager = UpnpManager() + manager._lan_addr = "192.168.1.100" + manager._external_ip = "123.45.67.89" + + mock_upnp_gateway.deleteportmapping.return_value = None + result = await manager.remove_port_mapping(port=8080, protocol="TCP") + + assert result is True + mock_upnp_gateway.deleteportmapping.assert_called_once_with(8080, "TCP")