Skip to content

✨ Feat: add Thin Waist address validation utilities and integrate into echo example #811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions examples/advanced/network_discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Advanced demonstration of Thin Waist address handling.

Run:
python -m examples.advanced.network_discovery
"""

from __future__ import annotations

from multiaddr import Multiaddr

try:
from libp2p.utils.address_validation import (
get_available_interfaces,
expand_wildcard_address,
get_optimal_binding_address,
)
except ImportError:
# Fallbacks if utilities are missing
def get_available_interfaces(port: int, protocol: str = "tcp"):
return [Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")]

def expand_wildcard_address(addr: Multiaddr, port: int | None = None):
return [addr if port is None else Multiaddr(str(addr).rsplit("/", 1)[0] + f"/{port}")]

def get_optimal_binding_address(port: int, protocol: str = "tcp"):
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")


def main() -> None:
port = 8080
interfaces = get_available_interfaces(port)
print(f"Discovered interfaces for port {port}:")
for a in interfaces:
print(f" - {a}")

wildcard_v4 = Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
expanded_v4 = expand_wildcard_address(wildcard_v4)
print("\nExpanded IPv4 wildcard:")
for a in expanded_v4:
print(f" - {a}")

wildcard_v6 = Multiaddr(f"/ip6/::/tcp/{port}")
expanded_v6 = expand_wildcard_address(wildcard_v6)
print("\nExpanded IPv6 wildcard:")
for a in expanded_v6:
print(f" - {a}")

print("\nOptimal binding address heuristic result:")
print(f" -> {get_optimal_binding_address(port)}")

override_port = 9000
overridden = expand_wildcard_address(wildcard_v4, port=override_port)
print(f"\nPort override expansion to {override_port}:")
for a in overridden:
print(f" - {a}")


if __name__ == "__main__":
main()
10 changes: 8 additions & 2 deletions examples/echo/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
info_from_p2p_addr,
)

from libp2p.utils.address_validation import (
get_optimal_binding_address,
get_available_interfaces,
)

PROTOCOL_ID = TProtocol("/echo/1.0.0")
MAX_READ_LEN = 2**32 - 1

Expand All @@ -31,8 +36,9 @@ async def _echo_stream_handler(stream: INetStream) -> None:


async def run(port: int, destination: str, seed: int | None = None) -> None:
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")

# CHANGED: previously hardcoded 0.0.0.0
listen_addr = get_optimal_binding_address(port)

if seed:
import random

Expand Down
9 changes: 9 additions & 0 deletions libp2p/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
get_agent_version,
)

from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
expand_wildcard_address,
)

__all__ = [
"decode_uvarint_from_stream",
"encode_delim",
Expand All @@ -26,4 +32,7 @@
"decode_varint_from_bytes",
"decode_varint_with_size",
"read_length_prefixed_protobuf",
"get_available_interfaces",
"get_optimal_binding_address",
"expand_wildcard_address",
]
125 changes: 125 additions & 0 deletions libp2p/utils/address_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations
from typing import List, Optional
from multiaddr import Multiaddr

try:
from multiaddr.utils import get_thin_waist_addresses, get_network_addrs # type: ignore
_HAS_THIN_WAIST = True
except ImportError: # pragma: no cover - only executed in older environments
_HAS_THIN_WAIST = False
get_thin_waist_addresses = None # type: ignore
get_network_addrs = None # type: ignore


def _safe_get_network_addrs(ip_version: int) -> List[str]:
"""
Internal safe wrapper. Returns a list of IP addresses for the requested IP version.
Falls back to minimal defaults when Thin Waist helpers are missing.

:param ip_version: 4 or 6
"""
if _HAS_THIN_WAIST and get_network_addrs:
try:
return get_network_addrs(ip_version) or []
except Exception: # pragma: no cover - defensive
return []
# Fallback behavior (very conservative)
if ip_version == 4:
return ["127.0.0.1"]
if ip_version == 6:
return ["::1"]
return []


def _safe_expand(addr: Multiaddr, port: Optional[int] = None) -> List[Multiaddr]:
"""
Internal safe expansion wrapper. Returns a list of Multiaddr objects.
If Thin Waist isn't available, returns [addr] (identity).
"""
if _HAS_THIN_WAIST and get_thin_waist_addresses:
try:
if port is not None:
return get_thin_waist_addresses(addr, port=port) or []
return get_thin_waist_addresses(addr) or []
except Exception: # pragma: no cover - defensive
return [addr]
return [addr]


def get_available_interfaces(port: int, protocol: str = "tcp") -> List[Multiaddr]:
"""
Discover available network interfaces (IPv4 + IPv6 if supported) for binding.

:param port: Port number to bind to.
:param protocol: Transport protocol (e.g., "tcp" or "udp").
:return: List of Multiaddr objects representing candidate interface addresses.
"""
addrs: List[Multiaddr] = []

# IPv4 enumeration
for ip in _safe_get_network_addrs(4):
addrs.append(Multiaddr(f"/ip4/{ip}/{protocol}/{port}"))

# IPv6 enumeration (optional: only include if we have at least one global or loopback)
for ip in _safe_get_network_addrs(6):
# Avoid returning unusable wildcard expansions if the environment does not support IPv6
addrs.append(Multiaddr(f"/ip6/{ip}/{protocol}/{port}"))

# Fallback if nothing discovered
if not addrs:
addrs.append(Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}"))

return addrs


def expand_wildcard_address(addr: Multiaddr, port: Optional[int] = None) -> List[Multiaddr]:
"""
Expand a wildcard (e.g. /ip4/0.0.0.0/tcp/0) into all concrete interfaces.

:param addr: Multiaddr to expand.
:param port: Optional override for port selection.
:return: List of concrete Multiaddr instances.
"""
expanded = _safe_expand(addr, port=port)
if not expanded: # Safety fallback
return [addr]
return expanded


def get_optimal_binding_address(port: int, protocol: str = "tcp") -> Multiaddr:
"""
Choose an optimal address for an example to bind to:
- Prefer non-loopback IPv4
- Then non-loopback IPv6
- Fallback to loopback
- Fallback to wildcard

:param port: Port number.
:param protocol: Transport protocol.
:return: A single Multiaddr chosen heuristically.
"""
candidates = get_available_interfaces(port, protocol)

def is_non_loopback(ma: Multiaddr) -> bool:
s = str(ma)
return not ("/ip4/127." in s or "/ip6/::1" in s)

for c in candidates:
if "/ip4/" in str(c) and is_non_loopback(c):
return c
for c in candidates:
if "/ip6/" in str(c) and is_non_loopback(c):
return c
for c in candidates:
if "/ip4/127." in str(c) or "/ip6/::1" in str(c):
return c

# As a final fallback, produce a wildcard
return Multiaddr(f"/ip4/0.0.0.0/{protocol}/{port}")


__all__ = [
"get_available_interfaces",
"get_optimal_binding_address",
"expand_wildcard_address",
]
51 changes: 51 additions & 0 deletions tests/examples/test_echo_thin_waist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Trio we don't use asyncio (for uniformity in all code base)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okh got it

import contextlib
import subprocess
import sys
import time
from pathlib import Path

import pytest

# This test is intentionally lightweight and can be marked as 'integration'.
# It ensures the echo example runs and prints the new Thin Waist lines.

EXAMPLES_DIR = Path(__file__).parent.parent.parent / "examples" / "echo"


@pytest.mark.timeout(20)
def test_echo_example_starts_and_prints_thin_waist(monkeypatch, tmp_path):
# We run: python examples/echo/echo.py -p 0
cmd = [sys.executable, str(EXAMPLES_DIR / "echo.py"), "-p", "0"]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
assert proc.stdout is not None

found_selected = False
found_interfaces = False
start = time.time()

try:
while time.time() - start < 10:
line = proc.stdout.readline()
if not line:
time.sleep(0.1)
continue
if "Selected binding address:" in line:
found_selected = True
if "Available candidate interfaces:" in line:
found_interfaces = True
if "Waiting for incoming connections..." in line:
break
finally:
with contextlib.suppress(ProcessLookupError):
proc.terminate()
with contextlib.suppress(ProcessLookupError):
proc.kill()

assert found_selected, "Did not capture Thin Waist binding log line"
assert found_interfaces, "Did not capture Thin Waist interfaces log line"
56 changes: 56 additions & 0 deletions tests/utils/test_address_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os

import pytest
from multiaddr import Multiaddr

from libp2p.utils.address_validation import (
get_available_interfaces,
get_optimal_binding_address,
expand_wildcard_address,
)


@pytest.mark.parametrize("proto", ["tcp"])
def test_get_available_interfaces(proto: str) -> None:
interfaces = get_available_interfaces(0, protocol=proto)
assert len(interfaces) > 0
for addr in interfaces:
assert isinstance(addr, Multiaddr)
assert f"/{proto}/" in str(addr)


def test_get_optimal_binding_address() -> None:
addr = get_optimal_binding_address(0)
assert isinstance(addr, Multiaddr)
# At least IPv4 or IPv6 prefix present
s = str(addr)
assert ("/ip4/" in s) or ("/ip6/" in s)


def test_expand_wildcard_address_ipv4() -> None:
wildcard = Multiaddr("/ip4/0.0.0.0/tcp/0")
expanded = expand_wildcard_address(wildcard)
assert len(expanded) > 0
for e in expanded:
assert isinstance(e, Multiaddr)
assert "/tcp/" in str(e)


def test_expand_wildcard_address_port_override() -> None:
wildcard = Multiaddr("/ip4/0.0.0.0/tcp/7000")
overridden = expand_wildcard_address(wildcard, port=9001)
assert len(overridden) > 0
for e in overridden:
assert str(e).endswith("/tcp/9001")


@pytest.mark.skipif(
os.environ.get("NO_IPV6") == "1",
reason="Environment disallows IPv6",
)
def test_expand_wildcard_address_ipv6() -> None:
wildcard = Multiaddr("/ip6/::/tcp/0")
expanded = expand_wildcard_address(wildcard)
assert len(expanded) > 0
for e in expanded:
assert "/ip6/" in str(e)