diff --git a/RANDOM_WALK_README.md b/RANDOM_WALK_README.md new file mode 100644 index 000000000..027620521 --- /dev/null +++ b/RANDOM_WALK_README.md @@ -0,0 +1,345 @@ +# Random Walk Module Implementation for py-libp2p + +This document describes the implementation of the Random Walk module for py-libp2p, which provides automatic peer discovery and routing table maintenance for Kademlia DHT nodes. + +## Overview + +The Random Walk module implements a peer discovery mechanism similar to go-libp2p's implementation. It performs random walks through the DHT network to discover new peers and maintain routing table health through periodic refreshes. + +## Architecture + +### Components + +1. **RandomWalk** (`libp2p/routing_table/random_walk.py`) + + - Generates random peer IDs for discovery queries + - Performs FIND_NODE queries to discover peers + - Validates discovered peers + +1. **RTRefreshManager** (`libp2p/routing_table/rt_refresh_manager.py`) + + - Manages periodic routing table refreshes + - Coordinates random walk operations + - Handles peer liveness checks and eviction + +1. **Configuration** (`libp2p/routing_table/config.py`) + + - Centralized configuration constants + - Compatible with go-libp2p defaults + +1. **Exceptions** (`libp2p/routing_table/exceptions.py`) + + - Custom exception types for error handling + +### Integration Flow + +``` +Kademlia DHT → RT Refresh Manager → Random Walk → Peer Discovery → Routing Table Update +``` + +## Features + +### Random Walk + +- Generates cryptographically secure random 256-bit peer IDs +- Performs concurrent random walks for efficiency +- Validates discovered peers before adding to routing table +- Removes duplicate peers automatically + +### RT Refresh Manager + +- Automatic periodic routing table refreshes +- Manual refresh triggering capability +- Peer liveness checking and eviction +- Configurable refresh intervals and thresholds + +### Go-libp2p Compatibility + +- Uses same random peer ID generation (256-bit) +- Compatible refresh logic and timing +- Same DHT protocol messages +- Maintains protocol compatibility + +## Configuration + +### Default Settings (matching go-libp2p) + +```python +# Timing constants +REFRESH_QUERY_TIMEOUT = 60.0 # seconds +REFRESH_INTERVAL = 300.0 # 5 minutes +SUCCESSFUL_OUTBOUND_QUERY_GRACE_PERIOD = 60.0 # 1 minute + +# Routing table thresholds +MIN_RT_REFRESH_THRESHOLD = 4 # Minimum peers before triggering refresh +MAX_N_BOOTSTRAPPERS = 2 # Maximum bootstrap peers to try + +# Random walk specific +RANDOM_WALK_CONCURRENCY = 3 # Number of concurrent random walks +RANDOM_WALK_ENABLED = True # Enable automatic random walks +``` + +### Customization + +All settings can be customized when creating the RTRefreshManager: + +```python +rt_refresh_manager = RTRefreshManager( + host=host, + routing_table=routing_table, + local_peer_id=local_peer_id, + query_function=query_function, + enable_auto_refresh=True, + refresh_interval=300.0, # Custom interval + min_refresh_threshold=4, # Custom threshold +) +``` + +## Usage + +### Basic Integration with Kademlia DHT + +The Random Walk module is automatically integrated with the Kademlia DHT. When you create a KadDHT instance, it will automatically: + +1. Create and configure the RT Refresh Manager +1. Start automatic random walks (if enabled) +1. Maintain routing table health + +```python +from libp2p.kad_dht.kad_dht import KadDHT, DHTMode + +# Create DHT - Random Walk is automatically enabled +dht = KadDHT(host=host, mode=DHTMode.SERVER) + +# Start DHT - this also starts the RT Refresh Manager +await dht.run() +``` + +### Manual Refresh Triggering + +You can manually trigger routing table refreshes: + +```python +# Trigger refresh respecting timing constraints +await dht.trigger_routing_table_refresh() + +# Force refresh regardless of timing +await dht.trigger_routing_table_refresh(force=True) +``` + +### Standalone Usage + +You can also use the Random Walk module independently: + +```python +from libp2p.routing_table.random_walk import RandomWalk + +# Create standalone Random Walk instance +random_walk = RandomWalk( + host=host, + local_peer_id=local_peer_id, + query_function=query_function, + validation_function=validation_function, + ping_function=ping_function, +) + +# Perform single random walk +peers = await random_walk.perform_random_walk() + +# Perform concurrent random walks +peers = await random_walk.run_concurrent_random_walks(count=3) +``` + +## API Reference + +### RandomWalk Class + +#### Constructor + +```python +RandomWalk( + host: IHost, + local_peer_id: ID, + query_function: Callable[[str], AsyncContextManager[List[PeerInfo]]], + validation_function: Optional[Callable[[PeerInfo], AsyncContextManager[bool]]] = None, + ping_function: Optional[Callable[[ID], AsyncContextManager[bool]]] = None, +) +``` + +#### Methods + +- `generate_random_peer_id() -> str`: Generate random 256-bit peer ID +- `validate_peer(peer_info: PeerInfo) -> bool`: Validate discovered peer +- `perform_random_walk() -> List[PeerInfo]`: Perform single random walk +- `run_concurrent_random_walks(count: int = 3) -> List[PeerInfo]`: Run concurrent walks + +### RTRefreshManager Class + +#### Constructor + +```python +RTRefreshManager( + host: IHost, + routing_table: RoutingTableProtocol, + local_peer_id: ID, + query_function: Callable[[str], AsyncContextManager[List[PeerInfo]]], + ping_function: Optional[Callable[[ID], AsyncContextManager[bool]]] = None, + validation_function: Optional[Callable[[PeerInfo], AsyncContextManager[bool]]] = None, + enable_auto_refresh: bool = True, + refresh_interval: float = 300.0, + min_refresh_threshold: int = 4, +) +``` + +#### Methods + +- `start() -> None`: Start the refresh manager +- `stop() -> None`: Stop the refresh manager +- `trigger_refresh(force: bool = False) -> None`: Trigger manual refresh +- `add_refresh_done_callback(callback: Callable[[], None]) -> None`: Add completion callback +- `remove_refresh_done_callback(callback: Callable[[], None]) -> None`: Remove callback + +## Error Handling + +The module provides custom exception types: + +- `RoutingTableRefreshError`: Base exception for refresh operations +- `RandomWalkError`: Specific to random walk failures +- `PeerValidationError`: Peer validation failures + +All operations include proper error handling and logging: + +```python +try: + peers = await random_walk.perform_random_walk() +except RandomWalkError as e: + logger.error(f"Random walk failed: {e}") + # Handle gracefully +``` + +## Logging + +The module uses structured logging with appropriate log levels: + +- `INFO`: Important operations and results +- `DEBUG`: Detailed operation progress +- `WARNING`: Recoverable issues +- `ERROR`: Operation failures + +Log names: + +- `libp2p.routing_table.random_walk` +- `libp2p.routing_table.rt_refresh_manager` + +## Performance Considerations + +### Concurrency + +- Uses trio nurseries for concurrent operations +- Default 3 concurrent random walks +- Configurable concurrency levels + +### Rate Limiting + +- Respects configured timeouts +- Implements backoff for failed operations +- Prevents resource exhaustion + +### Memory Management + +- Removes duplicate peers automatically +- Evicts unresponsive peers +- Bounded routing table size + +## Testing + +### Unit Tests + +Run the basic test suite: + +```bash +python3 tests/routing_table/test_random_walk.py +``` + +### Demo + +Run the comprehensive demonstration: + +```bash +python3 examples/random_walk_demo.py +``` + +The demo shows: + +- Random peer ID generation +- Single and concurrent random walks +- RT Refresh Manager operation +- Peer discovery and validation + +## Dependencies + +The implementation uses only standard dependencies: + +- Standard Python libraries (secrets, time, logging) +- Existing py-libp2p modules +- trio for async operations +- typing for type hints + +No additional external dependencies are required. + +## Compatibility + +### Protocol Compatibility + +- Compatible with go-libp2p DHT nodes +- Uses standard Kademlia DHT messages +- Same peer ID format (256-bit) +- Compatible refresh behavior + +### Version Compatibility + +- Python 3.8+ +- py-libp2p 0.1.0+ +- trio 0.20.0+ + +## Future Enhancements + +Potential improvements for future versions: + +1. **Metrics and Monitoring** + + - Add Prometheus metrics + - Performance statistics + - Health monitoring + +1. **Advanced Configuration** + + - Per-network configuration profiles + - Dynamic parameter adjustment + - A/B testing capabilities + +1. **Optimization** + + - Smart peer selection algorithms + - Adaptive refresh intervals + - Network condition awareness + +1. **Additional Features** + + - Peer reputation tracking + - Geographic diversity + - Network topology awareness + +## Contributing + +When contributing to the Random Walk module: + +1. Follow existing code style and patterns +1. Add comprehensive tests for new features +1. Update documentation for API changes +1. Ensure go-libp2p compatibility +1. Include performance considerations + +## License + +This implementation follows the same license as py-libp2p (MIT/Apache 2.0). diff --git a/docs/examples.random_walk.rst b/docs/examples.random_walk.rst new file mode 100644 index 000000000..baa3f81f2 --- /dev/null +++ b/docs/examples.random_walk.rst @@ -0,0 +1,131 @@ +Random Walk Example +=================== + +This example demonstrates the Random Walk module's peer discovery capabilities using real libp2p hosts and Kademlia DHT. +It shows how the Random Walk module automatically discovers new peers and maintains routing table health. + +The Random Walk implementation performs the following key operations: + +* **Automatic Peer Discovery**: Generates random peer IDs and queries the DHT network to discover new peers +* **Routing Table Maintenance**: Periodically refreshes the routing table to maintain network connectivity +* **Connection Management**: Maintains optimal connections to healthy peers in the network +* **Real-time Statistics**: Displays routing table size, connected peers, and peerstore statistics + +.. code-block:: console + + $ python -m pip install libp2p + Collecting libp2p + ... + Successfully installed libp2p-x.x.x + $ cd examples/random_walk + $ python random_walk.py --mode server + 2025-08-12 19:51:25,424 - random-walk-example - INFO - === Random Walk Example for py-libp2p === + 2025-08-12 19:51:25,424 - random-walk-example - INFO - Mode: server, Port: 0 Demo interval: 30s + 2025-08-12 19:51:25,426 - random-walk-example - INFO - Starting server node on port 45123 + 2025-08-12 19:51:25,426 - random-walk-example - INFO - Node peer ID: 16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef + 2025-08-12 19:51:25,426 - random-walk-example - INFO - Node address: /ip4/0.0.0.0/tcp/45123/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef + 2025-08-12 19:51:25,427 - random-walk-example - INFO - Initial routing table size: 0 + 2025-08-12 19:51:25,427 - random-walk-example - INFO - DHT service started in SERVER mode + 2025-08-12 19:51:25,430 - libp2p.discovery.random_walk.rt_refresh_manager - INFO - RT Refresh Manager started + 2025-08-12 19:51:55,432 - random-walk-example - INFO - --- Iteration 1 --- + 2025-08-12 19:51:55,432 - random-walk-example - INFO - Routing table size: 15 + 2025-08-12 19:51:55,432 - random-walk-example - INFO - Connected peers: 8 + 2025-08-12 19:51:55,432 - random-walk-example - INFO - Peerstore size: 42 + +You can also run the example in client mode: + +.. code-block:: console + + $ python random_walk.py --mode client + 2025-08-12 19:52:15,424 - random-walk-example - INFO - === Random Walk Example for py-libp2p === + 2025-08-12 19:52:15,424 - random-walk-example - INFO - Mode: client, Port: 0 Demo interval: 30s + 2025-08-12 19:52:15,426 - random-walk-example - INFO - Starting client node on port 51234 + 2025-08-12 19:52:15,426 - random-walk-example - INFO - Node peer ID: 16Uiu2HAmAbc123xyz... + 2025-08-12 19:52:15,427 - random-walk-example - INFO - DHT service started in CLIENT mode + 2025-08-12 19:52:45,432 - random-walk-example - INFO - --- Iteration 1 --- + 2025-08-12 19:52:45,432 - random-walk-example - INFO - Routing table size: 8 + 2025-08-12 19:52:45,432 - random-walk-example - INFO - Connected peers: 5 + 2025-08-12 19:52:45,432 - random-walk-example - INFO - Peerstore size: 25 + +Command Line Options +-------------------- + +The example supports several command-line options: + +.. code-block:: console + + $ python random_walk.py --help + usage: random_walk.py [-h] [--mode {server,client}] [--port PORT] + [--demo-interval DEMO_INTERVAL] [--verbose] + + Random Walk Example for py-libp2p Kademlia DHT + + optional arguments: + -h, --help show this help message and exit + --mode {server,client} + Node mode: server (DHT server), or client (DHT client) + --port PORT Port to listen on (0 for random) + --demo-interval DEMO_INTERVAL + Interval between random walk demonstrations in seconds + --verbose Enable verbose logging + +Key Features Demonstrated +------------------------- + +**Automatic Random Walk Discovery** + The example shows how the Random Walk module automatically: + + * Generates random 256-bit peer IDs for discovery queries + * Performs concurrent random walks to maximize peer discovery + * Validates discovered peers and adds them to the routing table + * Maintains routing table health through periodic refreshes + +**Real-time Network Statistics** + The example displays live statistics every 30 seconds (configurable): + + * **Routing Table Size**: Number of peers in the Kademlia routing table + * **Connected Peers**: Number of actively connected peers + * **Peerstore Size**: Total number of known peers with addresses + +**Connection Management** + The example includes sophisticated connection management: + + * Automatically maintains connections to healthy peers + * Filters for compatible peers (TCP + IPv4 addresses) + * Reconnects to maintain optimal network connectivity + * Handles connection failures gracefully + +**DHT Integration** + Shows seamless integration between Random Walk and Kademlia DHT: + + * RT Refresh Manager coordinates with the DHT routing table + * Peer discovery feeds directly into DHT operations + * Both SERVER and CLIENT modes supported + * Bootstrap connectivity to public IPFS nodes + +Understanding the Output +------------------------ + +When you run the example, you'll see periodic statistics that show how the Random Walk module is working: + +* **Initial Phase**: Routing table starts empty and quickly discovers peers +* **Growth Phase**: Routing table size increases as more peers are discovered +* **Maintenance Phase**: Routing table size stabilizes as the system maintains optimal peer connections + +The Random Walk module runs automatically in the background, performing peer discovery queries every few minutes to ensure the routing table remains populated with fresh, reachable peers. + +Configuration +------------- + +The Random Walk module can be configured through the following parameters in ``libp2p.discovery.random_walk.config``: + +* ``RANDOM_WALK_ENABLED``: Enable/disable automatic random walks (default: True) +* ``REFRESH_INTERVAL``: Time between automatic refreshes in seconds (default: 300) +* ``RANDOM_WALK_CONCURRENCY``: Number of concurrent random walks (default: 3) +* ``MIN_RT_REFRESH_THRESHOLD``: Minimum routing table size before triggering refresh (default: 4) + +See Also +-------- + +* :doc:`examples.kademlia` - Kademlia DHT value storage and content routing +* :doc:`libp2p.discovery.random_walk` - Random Walk module API documentation diff --git a/docs/examples.rst b/docs/examples.rst index b20d0e63d..b8ba44d70 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -14,3 +14,4 @@ Examples examples.circuit_relay examples.kademlia examples.mDNS + examples.random_walk diff --git a/docs/libp2p.discovery.random_walk.rst b/docs/libp2p.discovery.random_walk.rst new file mode 100644 index 000000000..1cd7702c2 --- /dev/null +++ b/docs/libp2p.discovery.random_walk.rst @@ -0,0 +1,48 @@ +libp2p.discovery.random_walk package +==================================== + +The Random Walk module implements a peer discovery mechanism. +It performs random walks through the DHT network to discover new peers and maintain routing table health through periodic refreshes. + +Submodules +---------- + +libp2p.discovery.random_walk.config module +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. automodule:: libp2p.discovery.random_walk.config + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.random_walk.exceptions module +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. automodule:: libp2p.discovery.random_walk.exceptions + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.random_walk.random_walk module +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. automodule:: libp2p.discovery.random_walk.random_walk + :members: + :undoc-members: + :show-inheritance: + +libp2p.discovery.random_walk.rt_refresh_manager module +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. automodule:: libp2p.discovery.random_walk.rt_refresh_manager + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.discovery.random_walk + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/libp2p.discovery.rst b/docs/libp2p.discovery.rst index 508ca059d..4b8120888 100644 --- a/docs/libp2p.discovery.rst +++ b/docs/libp2p.discovery.rst @@ -10,6 +10,7 @@ Subpackages libp2p.discovery.bootstrap libp2p.discovery.events libp2p.discovery.mdns + libp2p.discovery.random_walk Submodules ---------- diff --git a/examples/kademlia/kademlia.py b/examples/kademlia/kademlia.py index 00c7915ae..5daa70d76 100644 --- a/examples/kademlia/kademlia.py +++ b/examples/kademlia/kademlia.py @@ -227,7 +227,7 @@ async def run_node( # Keep the node running while True: - logger.debug( + logger.info( "Status - Connected peers: %d," "Peers in store: %d, Values in store: %d", len(dht.host.get_connected_peers()), diff --git a/examples/random_walk/random_walk.py b/examples/random_walk/random_walk.py new file mode 100644 index 000000000..845ccd57b --- /dev/null +++ b/examples/random_walk/random_walk.py @@ -0,0 +1,221 @@ +""" +Random Walk Example for py-libp2p Kademlia DHT + +This example demonstrates the Random Walk module's peer discovery capabilities +using real libp2p hosts and Kademlia DHT. It shows how the Random Walk module +automatically discovers new peers and maintains routing table health. + +Usage: + # Start server nodes (they will discover peers via random walk) + python3 random_walk.py --mode server +""" + +import argparse +import logging +import random +import secrets +import sys + +from multiaddr import Multiaddr +import trio + +from libp2p import new_host +from libp2p.abc import IHost +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.kad_dht.kad_dht import DHTMode, KadDHT +from libp2p.tools.async_service import background_trio_service + + +# Simple logging configuration +def setup_logging(verbose: bool = False): + """Setup unified logging configuration.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler()], + ) + + # Configure key module loggers + for module in ["libp2p.discovery.random_walk", "libp2p.kad_dht"]: + logging.getLogger(module).setLevel(level) + + # Suppress noisy logs + logging.getLogger("multiaddr").setLevel(logging.WARNING) + + +logger = logging.getLogger("random-walk-example") + +# Default bootstrap nodes +DEFAULT_BOOTSTRAP_NODES = [ + "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ" +] + + +def filter_compatible_peer_info(peer_info) -> bool: + """Filter peer info to check if it has compatible addresses (TCP + IPv4).""" + if not hasattr(peer_info, "addrs") or not peer_info.addrs: + return False + + for addr in peer_info.addrs: + addr_str = str(addr) + if "/tcp/" in addr_str and "/ip4/" in addr_str and "/quic" not in addr_str: + return True + return False + + +async def maintain_connections(host: IHost) -> None: + """Maintain connections to ensure the host remains connected to healthy peers.""" + while True: + try: + connected_peers = host.get_connected_peers() + list_peers = host.get_peerstore().peers_with_addrs() + + if len(connected_peers) < 20: + logger.debug("Reconnecting to maintain peer connections...") + + # Find compatible peers + compatible_peers = [] + for peer_id in list_peers: + try: + peer_info = host.get_peerstore().peer_info(peer_id) + if filter_compatible_peer_info(peer_info): + compatible_peers.append(peer_id) + except Exception: + continue + + # Connect to random subset of compatible peers + if compatible_peers: + random_peers = random.sample( + compatible_peers, min(50, len(compatible_peers)) + ) + for peer_id in random_peers: + if peer_id not in connected_peers: + try: + with trio.move_on_after(5): + peer_info = host.get_peerstore().peer_info(peer_id) + await host.connect(peer_info) + logger.debug(f"Connected to peer: {peer_id}") + except Exception as e: + logger.debug(f"Failed to connect to {peer_id}: {e}") + + await trio.sleep(15) + except Exception as e: + logger.error(f"Error maintaining connections: {e}") + + +async def demonstrate_random_walk_discovery(dht: KadDHT, interval: int = 30) -> None: + """Demonstrate Random Walk peer discovery with periodic statistics.""" + iteration = 0 + while True: + iteration += 1 + logger.info(f"--- Iteration {iteration} ---") + logger.info(f"Routing table size: {dht.get_routing_table_size()}") + logger.info(f"Connected peers: {len(dht.host.get_connected_peers())}") + logger.info(f"Peerstore size: {len(dht.host.get_peerstore().peer_ids())}") + await trio.sleep(interval) + + +async def run_node(port: int, mode: str, demo_interval: int = 30) -> None: + """Run a node that demonstrates Random Walk peer discovery.""" + try: + if port <= 0: + port = random.randint(10000, 60000) + + logger.info(f"Starting {mode} node on port {port}") + + # Determine DHT mode + dht_mode = DHTMode.SERVER if mode == "server" else DHTMode.CLIENT + + # Create host and DHT + key_pair = create_new_key_pair(secrets.token_bytes(32)) + host = new_host(key_pair=key_pair, bootstrap=DEFAULT_BOOTSTRAP_NODES) + listen_addr = Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + # Start maintenance tasks + nursery.start_soon(host.get_peerstore().start_cleanup_task, 60) + nursery.start_soon(maintain_connections, host) + + peer_id = host.get_id().pretty() + logger.info(f"Node peer ID: {peer_id}") + logger.info(f"Node address: /ip4/0.0.0.0/tcp/{port}/p2p/{peer_id}") + + # Create and start DHT with Random Walk enabled + dht = KadDHT(host, dht_mode, enable_random_walk=True) + logger.info(f"Initial routing table size: {dht.get_routing_table_size()}") + + async with background_trio_service(dht): + logger.info(f"DHT service started in {dht_mode.value} mode") + logger.info(f"Random Walk enabled: {dht.is_random_walk_enabled()}") + + async with trio.open_nursery() as task_nursery: + # Start demonstration and status reporting + task_nursery.start_soon( + demonstrate_random_walk_discovery, dht, demo_interval + ) + + # Periodic status updates + async def status_reporter(): + while True: + await trio.sleep(30) + logger.debug( + f"Connected: {len(dht.host.get_connected_peers())}, " + f"Routing table: {dht.get_routing_table_size()}, " + f"Peerstore: {len(dht.host.get_peerstore().peer_ids())}" + ) + + task_nursery.start_soon(status_reporter) + await trio.sleep_forever() + + except Exception as e: + logger.error(f"Node error: {e}", exc_info=True) + sys.exit(1) + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Random Walk Example for py-libp2p Kademlia DHT", + ) + parser.add_argument( + "--mode", + choices=["server", "client"], + default="server", + help="Node mode: server (DHT server), or client (DHT client)", + ) + parser.add_argument( + "--port", type=int, default=0, help="Port to listen on (0 for random)" + ) + parser.add_argument( + "--demo-interval", + type=int, + default=30, + help="Interval between random walk demonstrations in seconds", + ) + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + return parser.parse_args() + + +def main(): + """Main entry point for the random walk example.""" + try: + args = parse_args() + setup_logging(args.verbose) + + logger.info("=== Random Walk Example for py-libp2p ===") + logger.info( + f"Mode: {args.mode}, Port: {args.port} Demo interval: {args.demo_interval}s" + ) + + trio.run(run_node, args.port, args.mode, args.demo_interval) + + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down...") + except Exception as e: + logger.critical(f"Example failed: {e}", exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/libp2p/discovery/random_walk/__init__.py b/libp2p/discovery/random_walk/__init__.py new file mode 100644 index 000000000..0b455afd8 --- /dev/null +++ b/libp2p/discovery/random_walk/__init__.py @@ -0,0 +1,17 @@ +"""Random walk discovery modules for py-libp2p.""" + +from .rt_refresh_manager import RTRefreshManager +from .random_walk import RandomWalk +from .exceptions import ( + RoutingTableRefreshError, + RandomWalkError, + PeerValidationError, +) + +__all__ = [ + "RTRefreshManager", + "RandomWalk", + "RoutingTableRefreshError", + "RandomWalkError", + "PeerValidationError", +] diff --git a/libp2p/discovery/random_walk/config.py b/libp2p/discovery/random_walk/config.py new file mode 100644 index 000000000..834a248f2 --- /dev/null +++ b/libp2p/discovery/random_walk/config.py @@ -0,0 +1,16 @@ +from typing import Final + +# Timing constants (matching go-libp2p) +PEER_PING_TIMEOUT: Final[float] = 10.0 # seconds +REFRESH_QUERY_TIMEOUT: Final[float] = 60.0 # seconds +REFRESH_INTERVAL: Final[float] = 10.0 # 10 seconds for demonstration +SUCCESSFUL_OUTBOUND_QUERY_GRACE_PERIOD: Final[float] = 60.0 # 1 minute + +# Routing table thresholds +MIN_RT_REFRESH_THRESHOLD: Final[int] = 4 # Minimum peers before triggering refresh +MAX_N_BOOTSTRAPPERS: Final[int] = 2 # Maximum bootstrap peers to try + +# Random walk specific +RANDOM_WALK_CONCURRENCY: Final[int] = 3 # Number of concurrent random walks +RANDOM_WALK_ENABLED: Final[bool] = True # Enable automatic random walks +RANDOM_WALK_RT_THRESHOLD: Final[int] = 20 # RT size threshold for peerstore fallback diff --git a/libp2p/discovery/random_walk/exceptions.py b/libp2p/discovery/random_walk/exceptions.py new file mode 100644 index 000000000..283256199 --- /dev/null +++ b/libp2p/discovery/random_walk/exceptions.py @@ -0,0 +1,19 @@ +from libp2p.exceptions import BaseLibp2pError + + +class RoutingTableRefreshError(BaseLibp2pError): + """Base exception for routing table refresh operations.""" + + pass + + +class RandomWalkError(RoutingTableRefreshError): + """Exception raised during random walk operations.""" + + pass + + +class PeerValidationError(RoutingTableRefreshError): + """Exception raised when peer validation fails.""" + + pass diff --git a/libp2p/discovery/random_walk/random_walk.py b/libp2p/discovery/random_walk/random_walk.py new file mode 100644 index 000000000..177843749 --- /dev/null +++ b/libp2p/discovery/random_walk/random_walk.py @@ -0,0 +1,218 @@ +from collections.abc import Awaitable, Callable +import logging +import secrets + +import trio + +from libp2p.abc import IHost +from libp2p.discovery.random_walk.config import ( + RANDOM_WALK_CONCURRENCY, + RANDOM_WALK_RT_THRESHOLD, + REFRESH_QUERY_TIMEOUT, +) +from libp2p.discovery.random_walk.exceptions import RandomWalkError +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo + +logger = logging.getLogger("libp2p.discovery.random_walk") + + +class RandomWalk: + """ + Random Walk implementation for peer discovery in Kademlia DHT. + + Generates random peer IDs and performs FIND_NODE queries to discover + new peers and populate the routing table. + """ + + def __init__( + self, + host: IHost, + local_peer_id: ID, + query_function: Callable[[bytes], Awaitable[list[ID]]], + ): + """ + Initialize Random Walk module. + + Args: + host: The libp2p host instance + local_peer_id: Local peer ID + query_function: Function to query for closest peers given target key bytes + + """ + self.host = host + self.local_peer_id = local_peer_id + self.query_function = query_function + + def generate_random_peer_id(self) -> str: + """ + Generate a completely random peer ID + for random walk queries. + + Returns: + Random peer ID as string + + """ + # Generate 32 random bytes (256 bits) - same as go-libp2p + random_bytes = secrets.token_bytes(32) + # Convert to hex string for query + return random_bytes.hex() + + async def perform_random_walk(self) -> list[PeerInfo]: + """ + Perform a single random walk operation. + + Returns: + List of validated peers discovered during the walk + + """ + try: + # Generate random peer ID + random_peer_id = self.generate_random_peer_id() + logger.info(f"Starting random walk for peer ID: {random_peer_id}") + + # Perform FIND_NODE query + discovered_peer_ids: list[ID] = [] + + with trio.move_on_after(REFRESH_QUERY_TIMEOUT): + # Call the query function with target key bytes + target_key = bytes.fromhex(random_peer_id) + discovered_peer_ids = await self.query_function(target_key) or [] + + if not discovered_peer_ids: + logger.debug(f"No peers discovered in random walk for {random_peer_id}") + return [] + + logger.info( + f"Discovered {len(discovered_peer_ids)} peers in random walk " + f"for {random_peer_id[:8]}..." # Show only first 8 chars for brevity + ) + + # Convert peer IDs to PeerInfo objects and validate + validated_peers: list[PeerInfo] = [] + + for peer_id in discovered_peer_ids: + try: + # Get addresses from peerstore + addrs = self.host.get_peerstore().addrs(peer_id) + if addrs: + peer_info = PeerInfo(peer_id, addrs) + validated_peers.append(peer_info) + except Exception as e: + logger.debug(f"Failed to create PeerInfo for {peer_id}: {e}") + continue + + return validated_peers + + except Exception as e: + logger.error(f"Random walk failed: {e}") + raise RandomWalkError(f"Random walk operation failed: {e}") from e + + async def run_concurrent_random_walks( + self, count: int = RANDOM_WALK_CONCURRENCY, current_routing_table_size: int = 0 + ) -> list[PeerInfo]: + """ + Run multiple random walks concurrently. + + Args: + count: Number of concurrent random walks to perform + current_routing_table_size: Current size of routing table (for optimization) + + Returns: + Combined list of all validated peers discovered + + """ + all_validated_peers: list[PeerInfo] = [] + logger.info(f"Starting {count} concurrent random walks)") + + # First, try to add peers from peerstore if routing table is small + if current_routing_table_size < RANDOM_WALK_RT_THRESHOLD: + try: + peerstore_peers = self._get_peerstore_peers() + if peerstore_peers: + logger.debug( + f"RT size ({current_routing_table_size}) below threshold, " + f"adding {len(peerstore_peers)} peerstore peers" + ) + all_validated_peers.extend(peerstore_peers) + except Exception as e: + logger.warning(f"Error processing peerstore peers: {e}") + + async def single_walk() -> None: + try: + peers = await self.perform_random_walk() + all_validated_peers.extend(peers) + except Exception as e: + logger.warning(f"Concurrent random walk failed: {e}") + return + + # Run concurrent random walks + async with trio.open_nursery() as nursery: + for _ in range(count): + nursery.start_soon(single_walk) + + # Remove duplicates based on peer ID + unique_peers = {} + for peer in all_validated_peers: + unique_peers[peer.peer_id] = peer + + result = list(unique_peers.values()) + logger.info( + f"Concurrent random walks completed: {len(result)} unique peers discovered" + ) + return result + + def _get_peerstore_peers(self) -> list[PeerInfo]: + """ + Get peer info objects from the host's peerstore. + + Returns: + List of PeerInfo objects from peerstore + + """ + try: + peerstore = self.host.get_peerstore() + peer_ids = peerstore.peers_with_addrs() + + peer_infos = [] + for peer_id in peer_ids: + try: + # Skip local peer + if peer_id == self.local_peer_id: + continue + + peer_info = peerstore.peer_info(peer_id) + if peer_info and peer_info.addrs: + # Filter for compatible addresses (TCP + IPv4) + if self._has_compatible_addresses(peer_info): + peer_infos.append(peer_info) + except Exception as e: + logger.debug(f"Error getting peer info for {peer_id}: {e}") + + return peer_infos + + except Exception as e: + logger.warning(f"Error accessing peerstore: {e}") + return [] + + def _has_compatible_addresses(self, peer_info: PeerInfo) -> bool: + """ + Check if a peer has TCP+IPv4 compatible addresses. + + Args: + peer_info: PeerInfo to check + + Returns: + True if peer has compatible addresses + + """ + if not peer_info.addrs: + return False + + for addr in peer_info.addrs: + addr_str = str(addr) + # Check for TCP and IPv4 compatibility, avoid QUIC + if "/tcp/" in addr_str and "/ip4/" in addr_str and "/quic" not in addr_str: + return True + + return False diff --git a/libp2p/discovery/random_walk/rt_refresh_manager.py b/libp2p/discovery/random_walk/rt_refresh_manager.py new file mode 100644 index 000000000..7ed63cbda --- /dev/null +++ b/libp2p/discovery/random_walk/rt_refresh_manager.py @@ -0,0 +1,208 @@ +from collections.abc import Awaitable, Callable +import logging +import time +from typing import Protocol + +import trio + +from libp2p.abc import IHost +from libp2p.discovery.random_walk.config import ( + MIN_RT_REFRESH_THRESHOLD, + RANDOM_WALK_CONCURRENCY, + RANDOM_WALK_ENABLED, + REFRESH_INTERVAL, +) +from libp2p.discovery.random_walk.exceptions import RoutingTableRefreshError +from libp2p.discovery.random_walk.random_walk import RandomWalk +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo + + +class RoutingTableProtocol(Protocol): + """Protocol for routing table operations needed by RT refresh manager.""" + + def size(self) -> int: + """Return the current size of the routing table.""" + ... + + async def add_peer(self, peer_obj: PeerInfo) -> bool: + """Add a peer to the routing table.""" + ... + + +logger = logging.getLogger("libp2p.discovery.random_walk.rt_refresh_manager") + + +class RTRefreshManager: + """ + Routing Table Refresh Manager for py-libp2p. + + Manages periodic routing table refreshes and random walk operations + to maintain routing table health and discover new peers. + """ + + def __init__( + self, + host: IHost, + routing_table: RoutingTableProtocol, + local_peer_id: ID, + query_function: Callable[[bytes], Awaitable[list[ID]]], + enable_auto_refresh: bool = RANDOM_WALK_ENABLED, + refresh_interval: float = REFRESH_INTERVAL, + min_refresh_threshold: int = MIN_RT_REFRESH_THRESHOLD, + ): + """ + Initialize RT Refresh Manager. + + Args: + host: The libp2p host instance + routing_table: Routing table of host + local_peer_id: Local peer ID + query_function: Function to query for closest peers given target key bytes + enable_auto_refresh: Whether to enable automatic refresh + refresh_interval: Interval between refreshes in seconds + min_refresh_threshold: Minimum RT size before triggering refresh + + """ + self.host = host + self.routing_table = routing_table + self.local_peer_id = local_peer_id + self.query_function = query_function + + self.enable_auto_refresh = enable_auto_refresh + self.refresh_interval = refresh_interval + self.min_refresh_threshold = min_refresh_threshold + + # Initialize random walk module + self.random_walk = RandomWalk( + host=host, + local_peer_id=self.local_peer_id, + query_function=query_function, + ) + + # Control variables + self._running = False + self._nursery: trio.Nursery | None = None + + # Tracking + self._last_refresh_time = 0.0 + self._refresh_done_callbacks: list[Callable[[], None]] = [] + + async def start(self) -> None: + """Start the RT Refresh Manager.""" + if self._running: + logger.warning("RT Refresh Manager is already running") + return + + self._running = True + + logger.info("Starting RT Refresh Manager") + + # Start the main loop + async with trio.open_nursery() as nursery: + self._nursery = nursery + nursery.start_soon(self._main_loop) + + async def stop(self) -> None: + """Stop the RT Refresh Manager.""" + if not self._running: + return + + logger.info("Stopping RT Refresh Manager") + self._running = False + + async def _main_loop(self) -> None: + """Main loop for the RT Refresh Manager.""" + logger.info("RT Refresh Manager main loop started") + + # Initial refresh if auto-refresh is enabled + if self.enable_auto_refresh: + await self._do_refresh(force=True) + + try: + while self._running: + async with trio.open_nursery() as nursery: + # Schedule periodic refresh if enabled + if self.enable_auto_refresh: + nursery.start_soon(self._periodic_refresh_task) + + except Exception as e: + logger.error(f"RT Refresh Manager main loop error: {e}") + finally: + logger.info("RT Refresh Manager main loop stopped") + + async def _periodic_refresh_task(self) -> None: + """Task for periodic refreshes.""" + while self._running: + await trio.sleep(self.refresh_interval) + if self._running: + await self._do_refresh() + + async def _do_refresh(self, force: bool = False) -> None: + """ + Perform routing table refresh operation. + + Args: + force: Whether to force refresh regardless of timing + + """ + try: + current_time = time.time() + + # Check if refresh is needed + if not force: + if current_time - self._last_refresh_time < self.refresh_interval: + logger.debug("Skipping refresh: interval not elapsed") + return + + if self.routing_table.size() >= self.min_refresh_threshold: + logger.debug("Skipping refresh: routing table size above threshold") + return + + logger.info(f"Starting routing table refresh (force={force})") + start_time = current_time + + # Perform random walks to discover new peers + logger.info("Running concurrent random walks to discover new peers") + current_rt_size = self.routing_table.size() + discovered_peers = await self.random_walk.run_concurrent_random_walks( + count=RANDOM_WALK_CONCURRENCY, + current_routing_table_size=current_rt_size, + ) + + # Add discovered peers to routing table + added_count = 0 + for peer_info in discovered_peers: + result = await self.routing_table.add_peer(peer_info) + if result: + added_count += 1 + + self._last_refresh_time = current_time + + duration = time.time() - start_time + logger.info( + f"Routing table refresh completed: " + f"{added_count}/{len(discovered_peers)} peers added, " + f"RT size: {self.routing_table.size()}, " + f"duration: {duration:.2f}s" + ) + + # Notify refresh completion + for callback in self._refresh_done_callbacks: + try: + callback() + except Exception as e: + logger.warning(f"Refresh callback error: {e}") + + except Exception as e: + logger.error(f"Routing table refresh failed: {e}") + raise RoutingTableRefreshError(f"Refresh operation failed: {e}") from e + + def add_refresh_done_callback(self, callback: Callable[[], None]) -> None: + """Add a callback to be called when refresh completes.""" + self._refresh_done_callbacks.append(callback) + + def remove_refresh_done_callback(self, callback: Callable[[], None]) -> None: + """Remove a refresh completion callback.""" + if callback in self._refresh_done_callbacks: + self._refresh_done_callbacks.remove(callback) diff --git a/libp2p/kad_dht/kad_dht.py b/libp2p/kad_dht/kad_dht.py index dcf323ba1..097b6c489 100644 --- a/libp2p/kad_dht/kad_dht.py +++ b/libp2p/kad_dht/kad_dht.py @@ -5,6 +5,7 @@ implementation based on the Kademlia algorithm and protocol. """ +from collections.abc import Awaitable, Callable from enum import ( Enum, ) @@ -20,6 +21,7 @@ from libp2p.abc import ( IHost, ) +from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager from libp2p.network.stream.net_stream import ( INetStream, ) @@ -73,14 +75,27 @@ class KadDHT(Service): This class provides a DHT implementation that combines routing table management, peer discovery, content routing, and value storage. + + Optional Random Walk feature enhances peer discovery by automatically + performing periodic random queries to discover new peers and maintain + routing table health. + + Example: + # Basic DHT without random walk (default) + dht = KadDHT(host, DHTMode.SERVER) + + # DHT with random walk enabled for enhanced peer discovery + dht = KadDHT(host, DHTMode.SERVER, enable_random_walk=True) + """ - def __init__(self, host: IHost, mode: DHTMode): + def __init__(self, host: IHost, mode: DHTMode, enable_random_walk: bool = False): """ Initialize a new Kademlia DHT node. :param host: The libp2p host. :param mode: The mode of host (Client or Server) - must be DHTMode enum + :param enable_random_walk: Whether to enable automatic random walk """ super().__init__() @@ -92,6 +107,7 @@ def __init__(self, host: IHost, mode: DHTMode): raise TypeError(f"mode must be DHTMode enum, got {type(mode)}") self.mode = mode + self.enable_random_walk = enable_random_walk # Initialize the routing table self.routing_table = RoutingTable(self.local_peer_id, self.host) @@ -108,13 +124,56 @@ def __init__(self, host: IHost, mode: DHTMode): # Last time we republished provider records self._last_provider_republish = time.time() + # Initialize RT Refresh Manager (only if random walk is enabled) + self.rt_refresh_manager: RTRefreshManager | None = None + if self.enable_random_walk: + self.rt_refresh_manager = RTRefreshManager( + host=self.host, + routing_table=self.routing_table, + local_peer_id=self.local_peer_id, + query_function=self._create_query_function(), + enable_auto_refresh=True, + ) + # Set protocol handlers host.set_stream_handler(PROTOCOL_ID, self.handle_stream) + def _create_query_function(self) -> Callable[[bytes], Awaitable[list[ID]]]: + """ + Create a query function that wraps peer_routing.find_closest_peers_network. + + This function is used by the RandomWalk module to query for peers without + directly importing PeerRouting, avoiding circular import issues. + + Returns: + Callable that takes target_key bytes and returns list of peer IDs + + """ + + async def query_function(target_key: bytes) -> list[ID]: + """Query for closest peers to target key.""" + return await self.peer_routing.find_closest_peers_network(target_key) + + return query_function + async def run(self) -> None: """Run the DHT service.""" logger.info(f"Starting Kademlia DHT with peer ID {self.local_peer_id}") + # Start the RT Refresh Manager in parallel with the main DHT service + async with trio.open_nursery() as nursery: + # Start the RT Refresh Manager only if random walk is enabled + if self.rt_refresh_manager is not None: + nursery.start_soon(self.rt_refresh_manager.start) + logger.info("RT Refresh Manager started - Random Walk is now active") + else: + logger.info("Random Walk is disabled - RT Refresh Manager not started") + + # Start the main DHT service loop + nursery.start_soon(self._run_main_loop) + + async def _run_main_loop(self) -> None: + """Run the main DHT service loop.""" # Main service loop while self.manager.is_running: # Periodically refresh the routing table @@ -135,6 +194,17 @@ async def run(self) -> None: # Wait before next maintenance cycle await trio.sleep(ROUTING_TABLE_REFRESH_INTERVAL) + async def stop(self) -> None: + """Stop the DHT service and cleanup resources.""" + logger.info("Stopping Kademlia DHT") + + # Stop the RT Refresh Manager only if it was started + if self.rt_refresh_manager is not None: + await self.rt_refresh_manager.stop() + logger.info("RT Refresh Manager stopped") + else: + logger.info("RT Refresh Manager was not running (Random Walk disabled)") + async def switch_mode(self, new_mode: DHTMode) -> DHTMode: """ Switch the DHT mode. @@ -614,3 +684,15 @@ def get_value_store_size(self) -> int: """ return self.value_store.size() + + def is_random_walk_enabled(self) -> bool: + """ + Check if random walk peer discovery is enabled. + + Returns + ------- + bool + True if random walk is enabled, False otherwise. + + """ + return self.enable_random_walk diff --git a/libp2p/kad_dht/peer_routing.py b/libp2p/kad_dht/peer_routing.py index 4bcdb6474..c4a066f7d 100644 --- a/libp2p/kad_dht/peer_routing.py +++ b/libp2p/kad_dht/peer_routing.py @@ -170,7 +170,7 @@ async def find_closest_peers_network( # Return early if we have no peers to start with if not closest_peers: - logger.warning("No local peers available for network lookup") + logger.debug("No local peers available for network lookup") return [] # Iterative lookup until convergence diff --git a/libp2p/kad_dht/routing_table.py b/libp2p/kad_dht/routing_table.py index 15b6721e4..07e161009 100644 --- a/libp2p/kad_dht/routing_table.py +++ b/libp2p/kad_dht/routing_table.py @@ -591,6 +591,20 @@ def get_stale_peers(self, stale_threshold_seconds: int = 3600) -> list[ID]: stale_peers.extend(bucket.get_stale_peers(stale_threshold_seconds)) return stale_peers + def get_peer_infos(self) -> list[PeerInfo]: + """ + Get all PeerInfo objects in the routing table. + + Returns + ------- + List[PeerInfo]: List of all PeerInfo objects + + """ + peer_infos = [] + for bucket in self.buckets: + peer_infos.extend(bucket.peer_infos()) + return peer_infos + def cleanup_routing_table(self) -> None: """ Cleanup the routing table by removing all data. diff --git a/newsfragments/822.feature.rst b/newsfragments/822.feature.rst new file mode 100644 index 000000000..f9aa3c0ea --- /dev/null +++ b/newsfragments/822.feature.rst @@ -0,0 +1 @@ +Added `Random Walk` peer discovery module that enables random peer exploration for improved peer discovery.