Skip to content

Commit f96fe0c

Browse files
authored
Merge branch 'main' into main
2 parents 5a95212 + 3403689 commit f96fe0c

File tree

5 files changed

+130
-23
lines changed

5 files changed

+130
-23
lines changed

libp2p/identity/identify_push/identify_push.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
ID_PUSH = TProtocol("/ipfs/id/push/1.0.0")
4141
PROTOCOL_VERSION = "ipfs/0.1.0"
4242
AGENT_VERSION = get_agent_version()
43+
CONCURRENCY_LIMIT = 10
4344

4445

4546
def identify_push_handler_for(host: IHost) -> StreamHandlerFn:
@@ -132,7 +133,10 @@ async def _update_peerstore_from_identify(
132133

133134

134135
async def push_identify_to_peer(
135-
host: IHost, peer_id: ID, observed_multiaddr: Multiaddr | None = None
136+
host: IHost,
137+
peer_id: ID,
138+
observed_multiaddr: Multiaddr | None = None,
139+
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
136140
) -> bool:
137141
"""
138142
Push an identify message to a specific peer.
@@ -146,25 +150,26 @@ async def push_identify_to_peer(
146150
True if the push was successful, False otherwise.
147151
148152
"""
149-
try:
150-
# Create a new stream to the peer using the identify/push protocol
151-
stream = await host.new_stream(peer_id, [ID_PUSH])
153+
async with limit:
154+
try:
155+
# Create a new stream to the peer using the identify/push protocol
156+
stream = await host.new_stream(peer_id, [ID_PUSH])
152157

153-
# Create the identify message
154-
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
155-
response = identify_msg.SerializeToString()
158+
# Create the identify message
159+
identify_msg = _mk_identify_protobuf(host, observed_multiaddr)
160+
response = identify_msg.SerializeToString()
156161

157-
# Send the identify message
158-
await stream.write(response)
162+
# Send the identify message
163+
await stream.write(response)
159164

160-
# Close the stream
161-
await stream.close()
165+
# Close the stream
166+
await stream.close()
162167

163-
logger.debug("Successfully pushed identify to peer %s", peer_id)
164-
return True
165-
except Exception as e:
166-
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
167-
return False
168+
logger.debug("Successfully pushed identify to peer %s", peer_id)
169+
return True
170+
except Exception as e:
171+
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
172+
return False
168173

169174

170175
async def push_identify_to_peers(
@@ -179,13 +184,10 @@ async def push_identify_to_peers(
179184
"""
180185
if peer_ids is None:
181186
# Get all connected peers
182-
peer_ids = set(host.get_peerstore().peer_ids())
187+
peer_ids = set(host.get_connected_peers())
183188

184189
# Push to each peer in parallel using a trio.Nursery
185-
# TODO: Consider using a bounded nursery to limit concurrency
186-
# and avoid overwhelming the network. This can be done by using
187-
# trio.open_nursery(max_concurrent=10) or similar.
188-
# For now, we will use an unbounded nursery for simplicity.
190+
# limiting concurrent connections to 10
189191
async with trio.open_nursery() as nursery:
190192
for peer_id in peer_ids:
191193
nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr)

newsfragments/621.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Limit concurrency in `push_identify_to_peers` to prevent resource congestion under high peer counts.

pyproject.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
[build-system]
32
requires = ["setuptools>=42", "wheel"]
43
build-backend = "setuptools.build_meta"
@@ -23,7 +22,7 @@ dependencies = [
2322
"multiaddr>=0.0.9",
2423
"mypy-protobuf>=3.0.0",
2524
"noiseprotocol>=0.3.0",
26-
"protobuf>=3.20.1,<4.0.0",
25+
"protobuf>=4.21.0,<5.0.0",
2726
"pycryptodome>=3.9.2",
2827
"pymultihash>=0.8.2",
2928
"pynacl>=1.3.0",

tests/core/identity/identify_push/test_identify_push.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import logging
2+
from unittest.mock import (
3+
patch,
4+
)
25

36
import pytest
47
import multiaddr
@@ -17,6 +20,7 @@
1720
Identify,
1821
)
1922
from libp2p.identity.identify_push.identify_push import (
23+
CONCURRENCY_LIMIT,
2024
ID_PUSH,
2125
_update_peerstore_from_identify,
2226
identify_push_handler_for,
@@ -29,6 +33,9 @@
2933
from tests.utils.factories import (
3034
host_pair_factory,
3135
)
36+
from tests.utils.utils import (
37+
create_mock_connections,
38+
)
3239

3340
logger = logging.getLogger("libp2p.identity.identify-push-test")
3441

@@ -175,6 +182,7 @@ async def test_identify_push_to_peers(security_protocol):
175182
host_c = new_host(key_pair=key_pair_c)
176183

177184
# Set up the identify/push handlers
185+
host_a.set_stream_handler(ID_PUSH, identify_push_handler_for(host_a))
178186
host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b))
179187
host_c.set_stream_handler(ID_PUSH, identify_push_handler_for(host_c))
180188

@@ -204,6 +212,20 @@ async def test_identify_push_to_peers(security_protocol):
204212
# Check that the peer is in the peerstore
205213
assert peer_id_a in peerstore_c.peer_ids()
206214

215+
# Test for push_identify to only connected peers and not all peers
216+
# Disconnect a from c.
217+
await host_c.disconnect(host_a.get_id())
218+
219+
await push_identify_to_peers(host_c)
220+
221+
# Wait a bit for the push to complete
222+
await trio.sleep(0.1)
223+
224+
# Check that host_a's peerstore has not been updated with host_c's info
225+
assert host_c.get_id() not in host_a.get_peerstore().peer_ids()
226+
# Check that host_b's peerstore has been updated with host_c's info
227+
assert host_c.get_id() in host_b.get_peerstore().peer_ids()
228+
207229

208230
@pytest.mark.trio
209231
async def test_push_identify_to_peers_with_explicit_params(security_protocol):
@@ -412,3 +434,72 @@ async def test_partial_update_peerstore_from_identify(security_protocol):
412434
host_a_public_key = host_a.get_public_key().serialize()
413435
peerstore_public_key = peerstore.pubkey(peer_id).serialize()
414436
assert host_a_public_key == peerstore_public_key
437+
438+
439+
@pytest.mark.trio
440+
async def test_push_identify_to_peers_respects_concurrency_limit():
441+
"""
442+
Test bounded concurrency for the identify/push protocol to prevent
443+
network congestion.
444+
445+
This test verifies:
446+
1. The number of concurrent tasks executing the identify push is always
447+
less than or equal to CONCURRENCY_LIMIT.
448+
2. An error is raised if concurrency exceeds the defined limit.
449+
450+
It mocks `push_identify_to_peer` to simulate delay using sleep,
451+
allowing the test to measure and assert actual concurrency behavior.
452+
"""
453+
state = {
454+
"concurrency_counter": 0,
455+
"max_observed": 0,
456+
}
457+
lock = trio.Lock()
458+
459+
async def mock_push_identify_to_peer(
460+
host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT)
461+
) -> bool:
462+
"""
463+
Mock function to test concurrency by simulating an identify message.
464+
465+
This function patches push_identify_to_peer for testing purpose
466+
467+
Returns
468+
-------
469+
bool
470+
True if the push was successful, False otherwise.
471+
472+
"""
473+
async with limit:
474+
async with lock:
475+
state["concurrency_counter"] += 1
476+
if state["concurrency_counter"] > CONCURRENCY_LIMIT:
477+
raise RuntimeError(
478+
f"Concurrency limit exceeded: {state['concurrency_counter']}"
479+
)
480+
state["max_observed"] = max(
481+
state["max_observed"], state["concurrency_counter"]
482+
)
483+
484+
logger.debug("Successfully pushed identify to peer %s", peer_id)
485+
await trio.sleep(0.05)
486+
487+
async with lock:
488+
state["concurrency_counter"] -= 1
489+
490+
return True
491+
492+
# Create a mock host.
493+
key_pair_host = create_new_key_pair()
494+
host = new_host(key_pair=key_pair_host)
495+
496+
# Create a mock network and add mock connections to the host
497+
host.get_network().connections = create_mock_connections()
498+
with patch(
499+
"libp2p.identity.identify_push.identify_push.push_identify_to_peer",
500+
new=mock_push_identify_to_peer,
501+
):
502+
await push_identify_to_peers(host)
503+
assert state["max_observed"] <= CONCURRENCY_LIMIT, (
504+
f"Max concurrency observed: {state['max_observed']}"
505+
)

tests/utils/utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from unittest.mock import (
2+
MagicMock,
3+
)
4+
5+
6+
def create_mock_connections() -> dict:
7+
connections = {}
8+
9+
for i in range(1, 31):
10+
peer_id = f"peer-{i}"
11+
mock_conn = MagicMock(name=f"INetConn-{i}")
12+
connections[peer_id] = mock_conn
13+
14+
return connections

0 commit comments

Comments
 (0)