Skip to content

Commit a7d122a

Browse files
committed
added extra tests for identifu push for concurrency cap
1 parent 8bfd4bd commit a7d122a

File tree

3 files changed

+111
-10
lines changed

3 files changed

+111
-10
lines changed

libp2p/identity/identify_push/identify_push.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,7 @@ async def push_identify_to_peers(
176176
host: IHost,
177177
peer_ids: set[ID] | None = None,
178178
observed_multiaddr: Multiaddr | None = None,
179-
counter: dict[str, int] | None = None,
180-
lock: trio.Lock | None = None,
181-
limit: int = CONCURRENCY_LIMIT,
182-
) -> int: # <-- return the max concurrency
179+
) -> None:
183180
"""
184181
Push an identify message to multiple peers in parallel.
185182
@@ -193,6 +190,4 @@ async def push_identify_to_peers(
193190
# limiting concurrent connections to 10
194191
async with trio.open_nursery() as nursery:
195192
for peer_id in peer_ids:
196-
nursery.start_soon(limited_push, peer_id)
197-
198-
return counter["max"] if counter else 0
193+
nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr)

tests/core/identity/identify_push/test_identify_push.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from libp2p import (
1111
new_host,
1212
)
13-
from libp2p.abc import IHost
1413
from libp2p.crypto.secp256k1 import (
1514
create_new_key_pair,
1615
)
@@ -36,6 +35,8 @@
3635
)
3736
from tests.utils.utils import (
3837
create_mock_connections,
38+
run_host_forever,
39+
wait_until_listening,
3940
)
4041

4142
logger = logging.getLogger("libp2p.identity.identify-push-test")
@@ -504,3 +505,91 @@ async def mock_push_identify_to_peer(
504505
assert state["max_observed"] <= CONCURRENCY_LIMIT, (
505506
f"Max concurrency observed: {state['max_observed']}"
506507
)
508+
509+
510+
@pytest.mark.trio
511+
async def test_all_peers_receive_identify_push_with_semaphore(security_protocol):
512+
dummy_peers = []
513+
514+
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
515+
# Create dummy peers
516+
for _ in range(50):
517+
key_pair = create_new_key_pair()
518+
dummy_host = new_host(key_pair=key_pair)
519+
dummy_host.set_stream_handler(
520+
ID_PUSH, identify_push_handler_for(dummy_host)
521+
)
522+
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
523+
dummy_peers.append((dummy_host, listen_addr))
524+
525+
async with trio.open_nursery() as nursery:
526+
# Start all dummy hosts
527+
for host, listen_addr in dummy_peers:
528+
nursery.start_soon(run_host_forever, host, listen_addr)
529+
530+
# Wait for all hosts to finish setting up listeners
531+
for host, _ in dummy_peers:
532+
await wait_until_listening(host)
533+
534+
# Now connect host_a → dummy peers
535+
for host, _ in dummy_peers:
536+
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
537+
538+
await push_identify_to_peers(
539+
host_a,
540+
)
541+
542+
await trio.sleep(0.5)
543+
544+
peer_id_a = host_a.get_id()
545+
for host, _ in dummy_peers:
546+
dummy_peerstore = host.get_peerstore()
547+
assert peer_id_a in dummy_peerstore.peer_ids()
548+
549+
nursery.cancel_scope.cancel()
550+
551+
552+
@pytest.mark.trio
553+
async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_load(
554+
security_protocol,
555+
):
556+
dummy_peers = []
557+
558+
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
559+
# Create dummy peers
560+
# Breaking with more than 500 peers
561+
# Trio have a async tasks limit of 1000
562+
for _ in range(499):
563+
key_pair = create_new_key_pair()
564+
dummy_host = new_host(key_pair=key_pair)
565+
dummy_host.set_stream_handler(
566+
ID_PUSH, identify_push_handler_for(dummy_host)
567+
)
568+
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
569+
dummy_peers.append((dummy_host, listen_addr))
570+
571+
async with trio.open_nursery() as nursery:
572+
# Start all dummy hosts
573+
for host, listen_addr in dummy_peers:
574+
nursery.start_soon(run_host_forever, host, listen_addr)
575+
576+
# Wait for all hosts to finish setting up listeners
577+
for host, _ in dummy_peers:
578+
await wait_until_listening(host)
579+
580+
# Now connect host_a → dummy peers
581+
for host, _ in dummy_peers:
582+
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
583+
584+
await push_identify_to_peers(
585+
host_a,
586+
)
587+
588+
await trio.sleep(0.5)
589+
590+
peer_id_a = host_a.get_id()
591+
for host, _ in dummy_peers:
592+
dummy_peerstore = host.get_peerstore()
593+
assert peer_id_a in dummy_peerstore.peer_ids()
594+
595+
nursery.cancel_scope.cancel()

tests/utils/utils.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,30 @@
22
MagicMock,
33
)
44

5+
import trio
56

6-
def create_mock_connections() -> dict:
7+
from libp2p.abc import IHost
8+
9+
10+
def create_mock_connections(count: int = 50) -> dict:
711
connections = {}
812

9-
for i in range(1, 31):
13+
for i in range(1, count):
1014
peer_id = f"peer-{i}"
1115
mock_conn = MagicMock(name=f"INetConn-{i}")
1216
connections[peer_id] = mock_conn
1317

1418
return connections
19+
20+
21+
async def run_host_forever(host: IHost, addr):
22+
async with host.run([addr]):
23+
await trio.sleep_forever()
24+
25+
26+
async def wait_until_listening(host, timeout=3):
27+
with trio.move_on_after(timeout):
28+
while not host.get_addrs():
29+
await trio.sleep(0.05)
30+
return
31+
raise RuntimeError("Timed out waiting for host to get an address")

0 commit comments

Comments
 (0)