diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 706d649a7..93fb21690 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -67,14 +67,12 @@ class Swarm(Service, INetworkService): peerstore: IPeerStore upgrader: TransportUpgrader transport: ITransport - # TODO: Connection and `peer_id` are 1-1 mapping in our implementation, - # whereas in Go one `peer_id` may point to multiple connections. - connections: dict[ID, INetConn] + # Allow multiple connections per peer (list of INetConn) + connections: dict[ID, list[INetConn]] listeners: dict[str, IListener] common_stream_handler: StreamHandlerFn listener_nursery: trio.Nursery | None event_listener_nursery_created: trio.Event - notifees: list[INotifee] def __init__( @@ -88,14 +86,10 @@ def __init__( self.peerstore = peerstore self.upgrader = upgrader self.transport = transport - self.connections = dict() + self.connections = dict() # peer_id -> list of INetConn self.listeners = dict() - - # Create Notifee array self.notifees = [] - self.common_stream_handler = create_default_stream_handler(self) - self.listener_nursery = None self.event_listener_nursery_created = trio.Event() @@ -118,49 +112,57 @@ def get_peer_id(self) -> ID: def set_stream_handler(self, stream_handler: StreamHandlerFn) -> None: self.common_stream_handler = stream_handler - async def dial_peer(self, peer_id: ID) -> INetConn: + async def dial_peer(self, peer_id: ID, max_retries: int = 3, base_delay: float = 0.5) -> INetConn: """ - Try to create a connection to peer_id. - + Try to create a connection to peer_id with retry and exponential backoff. :param peer_id: peer if we want to dial + :param max_retries: number of retry attempts + :param base_delay: base delay for exponential backoff :raises SwarmException: raised when an error occurs :return: muxed connection """ - if peer_id in self.connections: - # If muxed connection already exists for peer_id, - # set muxed connection equal to existing muxed connection - return self.connections[peer_id] - - logger.debug("attempting to dial peer %s", peer_id) - - try: - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) - except PeerStoreError as error: - raise SwarmException(f"No known addresses to peer {peer_id}") from error - - if not addrs: - raise SwarmException(f"No known addresses to peer {peer_id}") + # If we already have a connection, return the first one + if peer_id in self.connections and self.connections[peer_id]: + return self.connections[peer_id][0] + logger.info(f"Attempting to dial peer {peer_id}") + attempt = 0 exceptions: list[SwarmException] = [] - - # Try all known addresses - for multiaddr in addrs: + while attempt < max_retries: try: - return await self.dial_addr(multiaddr, peer_id) - except SwarmException as e: - exceptions.append(e) - logger.debug( - "encountered swarm exception when trying to connect to %s, " - "trying next address...", - multiaddr, - exc_info=e, - ) - - # Tried all addresses, raising exception. + # Get peer info from peer store + addrs = self.peerstore.addrs(peer_id) + except PeerStoreError as error: + logger.error(f"No known addresses to peer {peer_id}: {error}") + raise SwarmException(f"No known addresses to peer {peer_id}") from error + if not addrs: + logger.error(f"No known addresses to peer {peer_id}") + raise SwarmException(f"No known addresses to peer {peer_id}") + for multiaddr in addrs: + try: + conn = await self.dial_addr(multiaddr, peer_id) + # Store the connection + if peer_id not in self.connections: + self.connections[peer_id] = [] + self.connections[peer_id].append(conn) + logger.info(f"Successfully connected to peer {peer_id} at {multiaddr}") + # Metrics: increment successful connection counter here + return conn + except SwarmException as e: + exceptions.append(e) + logger.warning( + f"Attempt {attempt+1}: Failed to connect to {peer_id} at {multiaddr}: {e}" + ) + # Exponential backoff before retrying + attempt += 1 + delay = base_delay * (2 ** (attempt - 1)) + logger.info(f"Retrying to dial peer {peer_id} after {delay:.2f}s (attempt {attempt+1})") + await trio.sleep(delay) + # All attempts failed + logger.error(f"Unable to connect to {peer_id} after {max_retries} attempts") + # Metrics: increment failed connection counter here raise SwarmException( - f"unable to connect to {peer_id}, no addresses established a successful " - "connection (with exceptions)" + f"unable to connect to {peer_id}, no addresses established a successful connection (with exceptions)" ) from MultiError(exceptions) async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn: @@ -318,8 +320,8 @@ async def close(self) -> None: # Close all connections manually if hasattr(self, "connections"): for conn_id in list(self.connections.keys()): - conn = self.connections[conn_id] - await conn.close() + for conn in self.connections[conn_id]: + await conn.close() # Clear connection tracking dictionary self.connections.clear() @@ -343,10 +345,10 @@ async def close(self) -> None: async def close_peer(self, peer_id: ID) -> None: if peer_id not in self.connections: return - connection = self.connections[peer_id] - # NOTE: `connection.close` will delete `peer_id` from `self.connections` - # and `notify_disconnected` for us. - await connection.close() + for conn in self.connections[peer_id]: + # NOTE: `conn.close` will delete `peer_id` from `self.connections` + # and `notify_disconnected` for us. + await conn.close() logger.debug("successfully close the connection to peer %s", peer_id) @@ -366,7 +368,9 @@ async def add_conn(self, muxed_conn: IMuxedConn) -> SwarmConn: self.manager.run_task(swarm_conn.start) await swarm_conn.event_started.wait() # Store muxed_conn with peer id - self.connections[muxed_conn.peer_id] = swarm_conn + if muxed_conn.peer_id not in self.connections: + self.connections[muxed_conn.peer_id] = [] + self.connections[muxed_conn.peer_id].append(swarm_conn) # Call notifiers since event occurred await self.notify_connected(swarm_conn) return swarm_conn @@ -379,7 +383,7 @@ def remove_conn(self, swarm_conn: SwarmConn) -> None: peer_id = swarm_conn.muxed_conn.peer_id if peer_id not in self.connections: return - del self.connections[peer_id] + self.connections[peer_id].remove(swarm_conn) # Notifee