Skip to content

Commit 51313a5

Browse files
authored
Merge branch 'main' into main
2 parents 5ac4fc1 + 8d2b889 commit 51313a5

File tree

3 files changed

+110
-2
lines changed

3 files changed

+110
-2
lines changed

newsfragments/708.performance.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added extra tests for identify push concurrency cap under high peer load

tests/core/identity/identify_push/test_identify_push.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
)
3636
from tests.utils.utils import (
3737
create_mock_connections,
38+
run_host_forever,
39+
wait_until_listening,
3840
)
3941

4042
logger = logging.getLogger("libp2p.identity.identify-push-test")
@@ -503,3 +505,91 @@ async def mock_push_identify_to_peer(
503505
assert state["max_observed"] <= CONCURRENCY_LIMIT, (
504506
f"Max concurrency observed: {state['max_observed']}"
505507
)
508+
509+
510+
@pytest.mark.trio
511+
async def test_all_peers_receive_identify_push_with_semaphore(security_protocol):
512+
dummy_peers = []
513+
514+
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
515+
# Create dummy peers
516+
for _ in range(50):
517+
key_pair = create_new_key_pair()
518+
dummy_host = new_host(key_pair=key_pair)
519+
dummy_host.set_stream_handler(
520+
ID_PUSH, identify_push_handler_for(dummy_host)
521+
)
522+
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
523+
dummy_peers.append((dummy_host, listen_addr))
524+
525+
async with trio.open_nursery() as nursery:
526+
# Start all dummy hosts
527+
for host, listen_addr in dummy_peers:
528+
nursery.start_soon(run_host_forever, host, listen_addr)
529+
530+
# Wait for all hosts to finish setting up listeners
531+
for host, _ in dummy_peers:
532+
await wait_until_listening(host)
533+
534+
# Now connect host_a → dummy peers
535+
for host, _ in dummy_peers:
536+
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
537+
538+
await push_identify_to_peers(
539+
host_a,
540+
)
541+
542+
await trio.sleep(0.5)
543+
544+
peer_id_a = host_a.get_id()
545+
for host, _ in dummy_peers:
546+
dummy_peerstore = host.get_peerstore()
547+
assert peer_id_a in dummy_peerstore.peer_ids()
548+
549+
nursery.cancel_scope.cancel()
550+
551+
552+
@pytest.mark.trio
553+
async def test_all_peers_receive_identify_push_with_semaphore_under_high_peer_load(
554+
security_protocol,
555+
):
556+
dummy_peers = []
557+
558+
async with host_pair_factory(security_protocol=security_protocol) as (host_a, _):
559+
# Create dummy peers
560+
# Breaking with more than 500 peers
561+
# Trio have a async tasks limit of 1000
562+
for _ in range(499):
563+
key_pair = create_new_key_pair()
564+
dummy_host = new_host(key_pair=key_pair)
565+
dummy_host.set_stream_handler(
566+
ID_PUSH, identify_push_handler_for(dummy_host)
567+
)
568+
listen_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
569+
dummy_peers.append((dummy_host, listen_addr))
570+
571+
async with trio.open_nursery() as nursery:
572+
# Start all dummy hosts
573+
for host, listen_addr in dummy_peers:
574+
nursery.start_soon(run_host_forever, host, listen_addr)
575+
576+
# Wait for all hosts to finish setting up listeners
577+
for host, _ in dummy_peers:
578+
await wait_until_listening(host)
579+
580+
# Now connect host_a → dummy peers
581+
for host, _ in dummy_peers:
582+
await host_a.connect(info_from_p2p_addr(host.get_addrs()[0]))
583+
584+
await push_identify_to_peers(
585+
host_a,
586+
)
587+
588+
await trio.sleep(0.5)
589+
590+
peer_id_a = host_a.get_id()
591+
for host, _ in dummy_peers:
592+
dummy_peerstore = host.get_peerstore()
593+
assert peer_id_a in dummy_peerstore.peer_ids()
594+
595+
nursery.cancel_scope.cancel()

tests/utils/utils.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,30 @@
22
MagicMock,
33
)
44

5+
import trio
56

6-
def create_mock_connections() -> dict:
7+
from libp2p.abc import IHost
8+
9+
10+
def create_mock_connections(count: int = 50) -> dict:
711
connections = {}
812

9-
for i in range(1, 31):
13+
for i in range(1, count):
1014
peer_id = f"peer-{i}"
1115
mock_conn = MagicMock(name=f"INetConn-{i}")
1216
connections[peer_id] = mock_conn
1317

1418
return connections
19+
20+
21+
async def run_host_forever(host: IHost, addr):
22+
async with host.run([addr]):
23+
await trio.sleep_forever()
24+
25+
26+
async def wait_until_listening(host, timeout=3):
27+
with trio.move_on_after(timeout):
28+
while not host.get_addrs():
29+
await trio.sleep(0.05)
30+
return
31+
raise RuntimeError("Timed out waiting for host to get an address")

0 commit comments

Comments
 (0)