diff --git a/examples/circuit_relay/__init__.py b/examples/circuit_relay/__init__.py new file mode 100644 index 000000000..ccab8b932 --- /dev/null +++ b/examples/circuit_relay/__init__.py @@ -0,0 +1,5 @@ +""" +Circuit Relay v2 example module. + +This package demonstrates the usage of Circuit Relay v2 protocol in libp2p. +""" diff --git a/examples/circuit_relay/relay_example.py b/examples/circuit_relay/relay_example.py new file mode 100644 index 000000000..47b62552c --- /dev/null +++ b/examples/circuit_relay/relay_example.py @@ -0,0 +1,484 @@ +""" +Circuit Relay v2 Example. + +This example demonstrates using the Circuit Relay v2 protocol by setting up: +1. A relay node that facilitates connections +2. A destination node that accepts incoming connections +3. A source node that connects to the destination through the relay + +Usage: + # First terminal - start the relay: + python relay_example.py --role relay --port 8000 + + # Second terminal - start the destination: + python relay_example.py --role destination --port 8001 --relay-addr RELAY_PEER_ID + + # Third terminal - start the source: + python relay_example.py --role source \ + --relay-addr RELAY_PEER_ID \ + --dest-id DESTINATION_PEER_ID +""" + +import argparse +import logging +import sys + +import multiaddr +import trio + +from libp2p import new_host +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.network.stream.net_stream import INetStream +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr +from libp2p.relay.circuit_v2.config import RelayConfig +from libp2p.relay.circuit_v2.discovery import RelayDiscovery +from libp2p.relay.circuit_v2.protocol import ( + PROTOCOL_ID, + STOP_PROTOCOL_ID, + CircuitV2Protocol, +) +from libp2p.relay.circuit_v2.resources import RelayLimits +from libp2p.relay.circuit_v2.transport import CircuitV2Transport +from libp2p.tools.async_service import background_trio_service + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", +) +logger = logging.getLogger("circuit-relay-example") + +# Application protocol for our example +EXAMPLE_PROTOCOL_ID = TProtocol("/circuit-relay-example/1.0.0") +MAX_READ_LEN = 2**16 # 64KB + + +async def handle_example_protocol(stream: INetStream) -> None: + """Handle incoming messages on our example protocol.""" + remote_peer_id = stream.muxed_conn.peer_id + logger.info(f"New stream from peer: {remote_peer_id}") + + try: + # Read the incoming message + msg = await stream.read(MAX_READ_LEN) + if msg: + logger.info(f"Received message: {msg.decode()}") + + # Send a response + # Get the local peer ID from the secure connection + local_peer_id = stream.muxed_conn.peer_id + response = f"Hello! This is {local_peer_id}".encode() + await stream.write(response) + logger.info(f"Sent response to {remote_peer_id}") + except Exception as e: + logger.error(f"Error handling stream: {e}") + finally: + await stream.close() + + +async def setup_relay_node(port: int, seed: int | None = None) -> None: + """Set up and run a relay node.""" + logger.info("Starting relay node...") + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the relay + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=True, # Act as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use other relays if needed + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=True) + + # Start the host + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + async with host.run(listen_addrs=[listen_addr]): + # Print information about this node + peer_id = host.get_id() + logger.info(f"Relay node started with ID: {peer_id}") + + addrs = host.get_addrs() + for addr in addrs: + logger.info(f"Listening on: {addr}") + + # Register protocol handlers + host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) + host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) + logger.debug("Protocol handlers registered") + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and register the transport + CircuitV2Transport(host, protocol, relay_config) + logger.info("Circuit relay transport initialized") + + print("\nRelay node is running. Use the following address to connect:") + print(f"{addrs[0]}/p2p/{peer_id}") + print("\nPress Ctrl+C to exit\n") + + # Keep the relay running + await trio.sleep_forever() + + +async def setup_destination_node( + port: int, relay_addr: str, seed: int | None = None +) -> None: + """Set up and run a destination node that accepts incoming connections.""" + logger.info("Starting destination node...") + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the circuit relay client + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=False, # Not acting as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use relays for dialing + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) + + # Start the host + listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + + async with host.run(listen_addrs=[listen_addr]): + # Print information about this node + peer_id = host.get_id() + logger.info(f"Destination node started with ID: {peer_id}") + + addrs = host.get_addrs() + for addr in addrs: + logger.info(f"Listening on: {addr}") + + # Register protocol handlers + host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol) + host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream) + host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream) + logger.debug("Protocol handlers registered") + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and initialize transport + transport = CircuitV2Transport(host, protocol, relay_config) + + # Create discovery service + discovery = RelayDiscovery(host, auto_reserve=True) + transport.discovery = discovery + + # Start discovery service + async with background_trio_service(discovery): + logger.info("Relay discovery service started") + + # Connect to the relay + if relay_addr: + logger.info(f"Connecting to relay at {relay_addr}") + try: + # Handle both peer ID only or full multiaddr formats + if relay_addr.startswith("/"): + # Full multiaddr format + relay_maddr = multiaddr.Multiaddr(relay_addr) + relay_info = info_from_p2p_addr(relay_maddr) + else: + # Assume it's just a peer ID + relay_peer_id = ID.from_base58(relay_addr) + relay_info = PeerInfo( + relay_peer_id, + [ + multiaddr.Multiaddr( + f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}" + ) + ], + ) + logger.info( + f"Using constructed address: {relay_info.addrs[0]}" + ) + + await host.connect(relay_info) + logger.info(f"Connected to relay {relay_info.peer_id}") + except Exception as e: + logger.error(f"Failed to connect to relay: {e}") + return + + print("\nDestination node is running with peer ID:") + print(f"{peer_id}") + print("\nPress Ctrl+C to exit\n") + + # Keep the node running + await trio.sleep_forever() + + +async def setup_source_node( + relay_addr: str, dest_id: str, seed: int | None = None +) -> None: + """ + Set up and run a source node that connects to the destination + through the relay. + """ + logger.info("Starting source node...") + + if not relay_addr: + logger.error("Relay address is required for source mode") + return + + if not dest_id: + logger.error("Destination peer ID is required for source mode") + return + + # Create host with a fixed key if seed is provided + key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None) + host = new_host(key_pair=key_pair) + + # Configure the circuit relay client + limits = RelayLimits( + duration=3600, # 1 hour + data=1024 * 1024 * 100, # 100 MB + max_circuit_conns=10, + max_reservations=5, + ) + + relay_config = RelayConfig( + enable_hop=False, # Not acting as a relay + enable_stop=True, # Accept relayed connections + enable_client=True, # Use relays for dialing + limits=limits, + ) + + # Initialize the protocol + protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False) + + # Start the host + async with host.run( + listen_addrs=[multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0")] + ): # Use ephemeral port + # Print information about this node + peer_id = host.get_id() + logger.info(f"Source node started with ID: {peer_id}") + + # Get assigned address for debugging + addrs = host.get_addrs() + if addrs: + logger.info(f"Source node listening on: {addrs[0]}") + + # Start the relay protocol service + async with background_trio_service(protocol): + logger.info("Circuit relay protocol started") + + # Create and initialize transport + transport = CircuitV2Transport(host, protocol, relay_config) + + # Create discovery service + discovery = RelayDiscovery(host, auto_reserve=True) + transport.discovery = discovery + + # Start discovery service + async with background_trio_service(discovery): + logger.info("Relay discovery service started") + + # Connect to the relay + logger.info(f"Connecting to relay at {relay_addr}") + try: + # Handle both peer ID only or full multiaddr formats + if relay_addr.startswith("/"): + # Full multiaddr format + relay_maddr = multiaddr.Multiaddr(relay_addr) + relay_info = info_from_p2p_addr(relay_maddr) + else: + # Assume it's just a peer ID + relay_peer_id = ID.from_base58(relay_addr) + relay_info = PeerInfo( + relay_peer_id, + [ + multiaddr.Multiaddr( + f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}" + ) + ], + ) + logger.info(f"Using constructed address: {relay_info.addrs[0]}") + + await host.connect(relay_info) + logger.info(f"Connected to relay {relay_info.peer_id}") + + # Wait for relay discovery to find the relay + await trio.sleep(2) + + # Convert destination ID string to peer ID + dest_peer_id = ID.from_base58(dest_id) + + # Try to connect to the destination through the relay + logger.info( + f"Connecting to destination {dest_peer_id} through relay" + ) + + # Create peer info with relay + relay_peer_id = relay_info.peer_id + logger.info(f"This is the relay peer id: {relay_peer_id}") + + # Create a proper peer info with a relay address + # The destination peer should be reachable through a + # p2p-circuit address + circuit_addr = multiaddr.Multiaddr(f"/p2p-circuit/p2p/{dest_id}") + dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr]) + logger.info(f"This is the dest peer info: {dest_peer_info}") + + # Dial through the relay + try: + logger.info( + f"Attempting to dial destination {dest_peer_id} " + f"through relay {relay_peer_id}" + ) + + connection = await transport.dial_peer_info( + dest_peer_info, relay_peer_id=relay_peer_id + ) + + logger.info(f"This is the dial connection: {connection}") + + logger.info( + "Successfully connected to destination through relay!" + ) + + # Open a stream to our example protocol + stream = await host.new_stream( + dest_peer_id, [EXAMPLE_PROTOCOL_ID] + ) + if stream: + logger.info( + f"Opened stream to destination with protocol " + f"{EXAMPLE_PROTOCOL_ID}" + ) + + # Send a message + msg = f"Hello from {peer_id}!".encode() + await stream.write(msg) + logger.info("Sent message to destination") + + # Wait for response + response = await stream.read(MAX_READ_LEN) + logger.info( + f"Received response: " + f"{response.decode() if response else 'No response'}" + ) + + # Close the stream + await stream.close() + else: + logger.error("Failed to open stream to destination") + except Exception as e: + logger.error(f"Failed to dial through relay: {str(e)}") + logger.error(f"Exception type: {type(e).__name__}") + raise + + except Exception as e: + logger.error(f"Error: {e}") + + print("\nSource operation completed") + # Keep running for a bit to allow messages to be processed + await trio.sleep(5) + + +def generate_fixed_private_key(seed: int | None) -> bytes: + """Generate a fixed private key from a seed for reproducible peer IDs.""" + import random + + if seed is None: + # Generate random bytes if no seed provided + return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") + + random.seed(seed) + return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big") + + +def main() -> None: + """Parse arguments and run the appropriate node type.""" + parser = argparse.ArgumentParser(description="Circuit Relay v2 Example") + parser.add_argument( + "--role", + type=str, + choices=["relay", "source", "destination"], + required=True, + help="Node role (relay, source, or destination)", + ) + parser.add_argument( + "--port", + type=int, + default=0, + help="Port to listen on (for relay and destination nodes)", + ) + parser.add_argument( + "--relay-addr", + type=str, + help="Multiaddress or peer ID of relay node (for destination and source nodes)", + ) + parser.add_argument( + "--dest-id", + type=str, + help="Peer ID of destination node (for source node)", + ) + parser.add_argument( + "--seed", + type=int, + help="Random seed for reproducible peer IDs", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + # Set log level + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + logging.getLogger("libp2p").setLevel(logging.DEBUG) + + try: + if args.role == "relay": + trio.run(setup_relay_node, args.port, args.seed) + elif args.role == "destination": + if not args.relay_addr: + parser.error("--relay-addr is required for destination role") + trio.run(setup_destination_node, args.port, args.relay_addr, args.seed) + elif args.role == "source": + if not args.relay_addr or not args.dest_id: + parser.error("--relay-addr and --dest-id are required for source role") + trio.run(setup_source_node, args.relay_addr, args.dest_id, args.seed) + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index 1cf76efa8..fd0cd6994 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -70,7 +70,7 @@ STREAM_READ_TIMEOUT = 15 # seconds STREAM_WRITE_TIMEOUT = 15 # seconds STREAM_CLOSE_TIMEOUT = 10 # seconds -MAX_READ_RETRIES = 5 # Maximum number of read retries +MAX_READ_RETRIES = 2 # Reduced retries to avoid masking real issues # Extended interfaces for type checking @@ -276,6 +276,7 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: This handler processes relay requests from other peers. """ + logger.debug("=== HOP STREAM HANDLER CALLED ===") try: # Try to get peer ID first try: @@ -290,112 +291,65 @@ async def _handle_hop_stream(self, stream: INetStream) -> None: logger.debug("Handling hop stream from %s", remote_id) - # First, handle the read timeout gracefully - try: - with trio.fail_after( - STREAM_READ_TIMEOUT * 2 - ): # Double the timeout for reading - msg_bytes = await stream.read() - if not msg_bytes: - logger.error( - "Empty read from stream from %s", - remote_id, - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = "Empty message received" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure message is sent - return - except trio.TooSlowError: - logger.error( - "Timeout reading from hop stream from %s", - remote_id, - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.CONNECTION_FAILED)) - pb_status.message = "Stream read timeout" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return - except Exception as e: - logger.error( - "Error reading from hop stream from %s: %s", - remote_id, - str(e), - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = f"Read error: {str(e)}" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return - - # Parse the message - try: - hop_msg = HopMessage() - hop_msg.ParseFromString(msg_bytes) - except Exception as e: - logger.error( - "Error parsing hop message from %s: %s", - remote_id, - str(e), - ) - # Create a proto Status directly - pb_status = PbStatus() - pb_status.code = cast(Any, int(StatusCode.MALFORMED_MESSAGE)) - pb_status.message = f"Parse error: {str(e)}" - - response = HopMessage( - type=HopMessage.STATUS, - status=pb_status, - ) - await stream.write(response.SerializeToString()) - await trio.sleep(0.5) # Longer wait to ensure the message is sent - return + # Handle multiple messages on the same stream + while True: + # Read message with timeout + try: + with trio.fail_after(STREAM_READ_TIMEOUT): + msg_bytes = await stream.read() + if not msg_bytes: + logger.debug("Stream closed by peer %s", remote_id) + return + except trio.TooSlowError: + logger.error("Timeout reading from hop stream from %s", remote_id) + return + except Exception as e: + logger.error( + "Error reading from hop stream from %s: %s", remote_id, str(e) + ) + return - # Process based on message type - if hop_msg.type == HopMessage.RESERVE: - logger.debug("Handling RESERVE message from %s", remote_id) - await self._handle_reserve(stream, hop_msg) - # For RESERVE requests, let the client close the stream - return - elif hop_msg.type == HopMessage.CONNECT: - logger.debug("Handling CONNECT message from %s", remote_id) - await self._handle_connect(stream, hop_msg) - else: - logger.error("Invalid message type %d from %s", hop_msg.type, remote_id) - # Send a nice error response using _send_status method - await self._send_status( - stream, - StatusCode.MALFORMED_MESSAGE, - f"Invalid message type: {hop_msg.type}", - ) + # Parse the message + try: + hop_msg = HopMessage() + hop_msg.ParseFromString(msg_bytes) + except Exception as e: + logger.error( + "Error parsing hop message from %s: %s", remote_id, str(e) + ) + await self._send_status( + stream, + StatusCode.MALFORMED_MESSAGE, + f"Parse error: {str(e)}", + ) + return + + # Process based on message type + if hop_msg.type == HopMessage.RESERVE: + logger.debug("Handling RESERVE message from %s", remote_id) + await self._handle_reserve(stream, hop_msg) + # Continue reading for more messages + elif hop_msg.type == HopMessage.CONNECT: + logger.debug("Handling CONNECT message from %s", remote_id) + await self._handle_connect(stream, hop_msg) + # CONNECT establishes a circuit, so we're done with this stream + return + else: + logger.error( + "Invalid message type %d from %s", hop_msg.type, remote_id + ) + await self._send_status( + stream, + StatusCode.MALFORMED_MESSAGE, + f"Invalid message type: {hop_msg.type}", + ) + return except Exception as e: logger.error( "Unexpected error handling hop stream from %s: %s", remote_id, str(e) ) try: - # Send a nice error response using _send_status method await self._send_status( stream, StatusCode.MALFORMED_MESSAGE, @@ -536,12 +490,8 @@ async def _handle_reserve(self, stream: INetStream, msg: Any) -> None: ttl, ) - # Send the response with increased timeout + # Send the response await stream.write(response.SerializeToString()) - - # Add a small wait to ensure the message is fully sent - await trio.sleep(0.1) - logger.debug("Reservation response sent successfully") except Exception as e: @@ -556,18 +506,11 @@ async def _handle_reserve(self, stream: INetStream, msg: Any) -> None: ) except Exception as send_err: logger.error("Failed to send error response: %s", str(send_err)) - finally: - # Always close the stream when done with reservation - if cast(INetStreamWithExtras, stream).is_open(): - try: - with trio.fail_after(STREAM_CLOSE_TIMEOUT): - await stream.close() - except Exception as close_err: - logger.error("Error closing stream: %s", str(close_err)) async def _handle_connect(self, stream: INetStream, msg: Any) -> None: """Handle a connect request.""" peer_id = ID(msg.peer) + logger.debug("Handling CONNECT request for peer %s", peer_id) dst_stream: INetStream | None = None # Verify reservation if provided @@ -594,12 +537,15 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: try: # Store the source stream with properly typed None self._active_relays[peer_id] = (stream, None) + logger.debug("Stored source stream for peer %s", peer_id) # Try to connect to the destination with timeout with trio.fail_after(STREAM_READ_TIMEOUT): + logger.debug("Attempting to connect to destination %s", peer_id) dst_stream = await self.host.new_stream(peer_id, [STOP_PROTOCOL_ID]) if not dst_stream: raise ConnectionError("Could not connect to destination") + logger.debug("Successfully connected to destination %s", peer_id) # Send STOP CONNECT message stop_msg = StopMessage( @@ -640,6 +586,7 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: reservation.active_connections += 1 # Send success status + logger.debug("Sending OK status to source") await self._send_status( stream, StatusCode.OK, @@ -653,6 +600,7 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: except (trio.TooSlowError, ConnectionError) as e: logger.error("Error establishing relay connection: %s", str(e)) + logger.debug("Sending CONNECTION_FAILED status to source") await self._send_status( stream, StatusCode.CONNECTION_FAILED, @@ -730,9 +678,11 @@ async def _relay_data( logger.error("Error relaying data: %s", str(e)) finally: # Clean up streams and remove from active relays - await src_stream.reset() - await dst_stream.reset() + # Only reset streams once to avoid double-reset issues if peer_id in self._active_relays: + src_stream_cleanup, dst_stream_cleanup = self._active_relays[peer_id] + await self._close_stream(src_stream_cleanup) + await self._close_stream(dst_stream_cleanup) del self._active_relays[peer_id] async def _send_status( @@ -744,7 +694,7 @@ async def _send_status( """Send a status message.""" try: logger.debug("Sending status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(STREAM_WRITE_TIMEOUT): # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -761,11 +711,7 @@ async def _send_status( logger.debug("Status message serialized (%d bytes)", len(msg_bytes)) await stream.write(msg_bytes) - logger.debug("Status message sent, waiting for processing") - - # Wait longer to ensure the message is sent - await trio.sleep(1.5) - logger.debug("Status message sending completed") + logger.debug("Status message sent successfully") except trio.TooSlowError: logger.error( "Timeout sending status message: code=%s, message=%s", code, message @@ -782,7 +728,7 @@ async def _send_stop_status( """Send a status message on a STOP stream.""" try: logger.debug("Sending stop status message with code %s: %s", code, message) - with trio.fail_after(STREAM_WRITE_TIMEOUT * 2): # Double the timeout + with trio.fail_after(STREAM_WRITE_TIMEOUT): # Create a proto Status directly pb_status = PbStatus() pb_status.code = cast( @@ -795,6 +741,5 @@ async def _send_stop_status( status=pb_status, ) await stream.write(status_msg.SerializeToString()) - await trio.sleep(0.5) # Ensure message is sent except Exception as e: logger.error("Error sending stop status message: %s", str(e)) diff --git a/libp2p/relay/circuit_v2/transport.py b/libp2p/relay/circuit_v2/transport.py index ffd310902..c472ac177 100644 --- a/libp2p/relay/circuit_v2/transport.py +++ b/libp2p/relay/circuit_v2/transport.py @@ -44,6 +44,7 @@ ) from .protocol import ( PROTOCOL_ID, + STREAM_READ_TIMEOUT, CircuitV2Protocol, ) from .protocol_buffer import ( @@ -159,9 +160,21 @@ async def dial_peer_info( raise ConnectionError("No suitable relay found") # Get a stream to the relay - relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) - if not relay_stream: - raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") + try: + logger.debug( + "Opening stream to relay %s with protocol %s", + relay_peer_id, + PROTOCOL_ID, + ) + relay_stream = await self.host.new_stream(relay_peer_id, [PROTOCOL_ID]) + if not relay_stream: + raise ConnectionError(f"Could not open stream to relay {relay_peer_id}") + logger.debug("Successfully opened stream to relay %s", relay_peer_id) + except Exception as e: + logger.error("Failed to open stream to relay %s: %s", relay_peer_id, str(e)) + raise ConnectionError( + f"Could not open stream to relay {relay_peer_id}: {str(e)}" + ) try: # First try to make a reservation if enabled @@ -179,10 +192,11 @@ async def dial_peer_info( ) await relay_stream.write(hop_msg.SerializeToString()) - # Read response - resp_bytes = await relay_stream.read() - resp = HopMessage() - resp.ParseFromString(resp_bytes) + # Read response with timeout + with trio.fail_after(STREAM_READ_TIMEOUT): + resp_bytes = await relay_stream.read() + resp = HopMessage() + resp.ParseFromString(resp_bytes) # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) @@ -256,17 +270,51 @@ async def _make_reservation( type=HopMessage.RESERVE, peer=self.host.get_id().to_bytes(), ) - await stream.write(reserve_msg.SerializeToString()) - - # Read response - resp_bytes = await stream.read() - resp = HopMessage() - resp.ParseFromString(resp_bytes) + logger.debug("=== SENDING RESERVATION REQUEST ===") + logger.debug("Message type: %s", reserve_msg.type) + logger.debug("Peer ID: %s", self.host.get_id()) + logger.debug("Raw message: %s", reserve_msg) + + try: + await stream.write(reserve_msg.SerializeToString()) + logger.debug("Successfully sent reservation request") + except Exception as e: + logger.error("Failed to send reservation request: %s", str(e)) + raise + + # Read response with timeout + logger.debug("=== WAITING FOR RESERVATION RESPONSE ===") + with trio.fail_after(STREAM_READ_TIMEOUT): + try: + resp_bytes = await stream.read() + logger.debug( + "Received reservation response: %d bytes", len(resp_bytes) + ) + resp = HopMessage() + resp.ParseFromString(resp_bytes) + logger.debug("=== PARSED RESERVATION RESPONSE ===") + logger.debug("Message type: %s", resp.type) + logger.debug( + "Status code: %s", getattr(resp.status, "code", "unknown") + ) + logger.debug( + "Status message: %s", getattr(resp.status, "message", "unknown") + ) + logger.debug("Raw response: %s", resp) + except Exception as e: + logger.error( + "Failed to read/parse reservation response: %s", str(e) + ) + raise # Access status attributes directly status_code = getattr(resp.status, "code", StatusCode.OK) status_msg = getattr(resp.status, "message", "Unknown error") + logger.debug( + "Reservation response: code=%s, message=%s", status_code, status_msg + ) + if status_code != StatusCode.OK: logger.warning( "Reservation failed with relay %s: %s", diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 1598ea42a..81477c37f 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -160,15 +160,21 @@ async def dial(self, maddr: Multiaddr) -> IRawConnection: try: # trio.open_tcp_stream requires host to be str or bytes, not None. + logger.debug("=== OPENING TCP STREAM ===") + logger.debug("Host: %s", host_str) + logger.debug("Port: %d", port_int) stream = await trio.open_tcp_stream(host_str, port_int) + logger.debug("Successfully opened TCP stream") except OSError as error: # OSError is common for network issues like "Connection refused" # or "Host unreachable". + logger.error("Failed to open TCP stream: %s", error) raise OpenConnectionError( f"Failed to open TCP stream to {maddr}: {error}" ) from error except Exception as error: # Catch other potential errors from trio.open_tcp_stream and wrap them. + logger.error("Unexpected error opening TCP stream: %s", error) raise OpenConnectionError( f"An unexpected error occurred when dialing {maddr}: {error}" ) from error