Skip to content

Commit b7d62c0

Browse files
committed
Limit concurrency to push identify message to peers
Signed-off-by: sukhman <[email protected]>
1 parent 2277d82 commit b7d62c0

File tree

1 file changed

+19
-20
lines changed

1 file changed

+19
-20
lines changed

libp2p/identity/identify_push/identify_push.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
ID_PUSH = TProtocol("/ipfs/id/push/1.0.0")
4141
PROTOCOL_VERSION = "ipfs/0.1.0"
4242
AGENT_VERSION = get_agent_version()
43+
LIMIT = trio.CapacityLimiter(10)
4344

4445

4546
def identify_push_handler_for(host: IHost) -> StreamHandlerFn:
@@ -146,25 +147,26 @@ async def push_identify_to_peer(
146147
True if the push was successful, False otherwise.
147148
148149
"""
149-
try:
150-
# Create a new stream to the peer using the identify/push protocol
151-
stream = await host.new_stream(peer_id, [ID_PUSH])
150+
async with LIMIT:
151+
try:
152+
# Create a new stream to the peer using the identify/push protocol
153+
stream = await host.new_stream(peer_id, [ID_PUSH])
152154

153-
# Create the identify message
154-
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
155-
response = identify_msg.SerializeToString()
155+
# Create the identify message
156+
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
157+
response = identify_msg.SerializeToString()
156158

157-
# Send the identify message
158-
await stream.write(response)
159+
# Send the identify message
160+
await stream.write(response)
159161

160-
# Close the stream
161-
await stream.close()
162+
# Close the stream
163+
await stream.close()
162164

163-
logger.debug("Successfully pushed identify to peer %s", peer_id)
164-
return True
165-
except Exception as e:
166-
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
167-
return False
165+
logger.debug("Successfully pushed identify to peer %s", peer_id)
166+
return True
167+
except Exception as e:
168+
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
169+
return False
168170

169171

170172
async def push_identify_to_peers(
@@ -179,13 +181,10 @@ async def push_identify_to_peers(
179181
"""
180182
if peer_ids is None:
181183
# Get all connected peers
182-
peer_ids = set(host.get_peerstore().peer_ids())
184+
peer_ids = set(host.get_connected_peers())
183185

184186
# Push to each peer in parallel using a trio.Nursery
185-
# TODO: Consider using a bounded nursery to limit concurrency
186-
# and avoid overwhelming the network. This can be done by using
187-
# trio.open_nursery(max_concurrent=10) or similar.
188-
# For now, we will use an unbounded nursery for simplicity.
187+
# limiting concurrent connections to 10
189188
async with trio.open_nursery() as nursery:
190189
for peer_id in peer_ids:
191190
nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr)

0 commit comments

Comments
 (0)