Skip to content

feat(swarm): add retry logic, exponential backoff, and multi-connection support to Swarm. #743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 55 additions & 51 deletions libp2p/network/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ class Swarm(Service, INetworkService):
peerstore: IPeerStore
upgrader: TransportUpgrader
transport: ITransport
# TODO: Connection and `peer_id` are 1-1 mapping in our implementation,
# whereas in Go one `peer_id` may point to multiple connections.
connections: dict[ID, INetConn]
# Allow multiple connections per peer (list of INetConn)
connections: dict[ID, list[INetConn]]
listeners: dict[str, IListener]
common_stream_handler: StreamHandlerFn
listener_nursery: trio.Nursery | None
event_listener_nursery_created: trio.Event

notifees: list[INotifee]

def __init__(
Expand All @@ -88,14 +86,10 @@ def __init__(
self.peerstore = peerstore
self.upgrader = upgrader
self.transport = transport
self.connections = dict()
self.connections = dict() # peer_id -> list of INetConn
self.listeners = dict()

# Create Notifee array
self.notifees = []

self.common_stream_handler = create_default_stream_handler(self)

self.listener_nursery = None
self.event_listener_nursery_created = trio.Event()

Expand All @@ -118,49 +112,57 @@ def get_peer_id(self) -> ID:
def set_stream_handler(self, stream_handler: StreamHandlerFn) -> None:
self.common_stream_handler = stream_handler

async def dial_peer(self, peer_id: ID) -> INetConn:
async def dial_peer(self, peer_id: ID, max_retries: int = 3, base_delay: float = 0.5) -> INetConn:
"""
Try to create a connection to peer_id.

Try to create a connection to peer_id with retry and exponential backoff.
:param peer_id: peer if we want to dial
:param max_retries: number of retry attempts
:param base_delay: base delay for exponential backoff
:raises SwarmException: raised when an error occurs
:return: muxed connection
"""
if peer_id in self.connections:
# If muxed connection already exists for peer_id,
# set muxed connection equal to existing muxed connection
return self.connections[peer_id]

logger.debug("attempting to dial peer %s", peer_id)

try:
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
except PeerStoreError as error:
raise SwarmException(f"No known addresses to peer {peer_id}") from error

if not addrs:
raise SwarmException(f"No known addresses to peer {peer_id}")
# If we already have a connection, return the first one
if peer_id in self.connections and self.connections[peer_id]:
return self.connections[peer_id][0]

logger.info(f"Attempting to dial peer {peer_id}")
attempt = 0
exceptions: list[SwarmException] = []

# Try all known addresses
for multiaddr in addrs:
while attempt < max_retries:
try:
return await self.dial_addr(multiaddr, peer_id)
except SwarmException as e:
exceptions.append(e)
logger.debug(
"encountered swarm exception when trying to connect to %s, "
"trying next address...",
multiaddr,
exc_info=e,
)

# Tried all addresses, raising exception.
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
except PeerStoreError as error:
logger.error(f"No known addresses to peer {peer_id}: {error}")
raise SwarmException(f"No known addresses to peer {peer_id}") from error
if not addrs:
logger.error(f"No known addresses to peer {peer_id}")
raise SwarmException(f"No known addresses to peer {peer_id}")
for multiaddr in addrs:
try:
conn = await self.dial_addr(multiaddr, peer_id)
# Store the connection
if peer_id not in self.connections:
self.connections[peer_id] = []
self.connections[peer_id].append(conn)
logger.info(f"Successfully connected to peer {peer_id} at {multiaddr}")
# Metrics: increment successful connection counter here
return conn
except SwarmException as e:
exceptions.append(e)
logger.warning(
f"Attempt {attempt+1}: Failed to connect to {peer_id} at {multiaddr}: {e}"
)
# Exponential backoff before retrying
attempt += 1
delay = base_delay * (2 ** (attempt - 1))
logger.info(f"Retrying to dial peer {peer_id} after {delay:.2f}s (attempt {attempt+1})")
await trio.sleep(delay)
# All attempts failed
logger.error(f"Unable to connect to {peer_id} after {max_retries} attempts")
# Metrics: increment failed connection counter here
raise SwarmException(
f"unable to connect to {peer_id}, no addresses established a successful "
"connection (with exceptions)"
f"unable to connect to {peer_id}, no addresses established a successful connection (with exceptions)"
) from MultiError(exceptions)

async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn:
Expand Down Expand Up @@ -318,8 +320,8 @@ async def close(self) -> None:
# Close all connections manually
if hasattr(self, "connections"):
for conn_id in list(self.connections.keys()):
conn = self.connections[conn_id]
await conn.close()
for conn in self.connections[conn_id]:
await conn.close()

# Clear connection tracking dictionary
self.connections.clear()
Expand All @@ -343,10 +345,10 @@ async def close(self) -> None:
async def close_peer(self, peer_id: ID) -> None:
if peer_id not in self.connections:
return
connection = self.connections[peer_id]
# NOTE: `connection.close` will delete `peer_id` from `self.connections`
# and `notify_disconnected` for us.
await connection.close()
for conn in self.connections[peer_id]:
# NOTE: `conn.close` will delete `peer_id` from `self.connections`
# and `notify_disconnected` for us.
await conn.close()

logger.debug("successfully close the connection to peer %s", peer_id)

Expand All @@ -366,7 +368,9 @@ async def add_conn(self, muxed_conn: IMuxedConn) -> SwarmConn:
self.manager.run_task(swarm_conn.start)
await swarm_conn.event_started.wait()
# Store muxed_conn with peer id
self.connections[muxed_conn.peer_id] = swarm_conn
if muxed_conn.peer_id not in self.connections:
self.connections[muxed_conn.peer_id] = []
self.connections[muxed_conn.peer_id].append(swarm_conn)
# Call notifiers since event occurred
await self.notify_connected(swarm_conn)
return swarm_conn
Expand All @@ -379,7 +383,7 @@ def remove_conn(self, swarm_conn: SwarmConn) -> None:
peer_id = swarm_conn.muxed_conn.peer_id
if peer_id not in self.connections:
return
del self.connections[peer_id]
self.connections[peer_id].remove(swarm_conn)

# Notifee

Expand Down
Loading