|
1 | 1 | import logging
|
| 2 | +from unittest.mock import ( |
| 3 | + patch, |
| 4 | +) |
2 | 5 |
|
3 | 6 | import pytest
|
4 | 7 | import multiaddr
|
|
29 | 32 | from tests.utils.factories import (
|
30 | 33 | host_pair_factory,
|
31 | 34 | )
|
| 35 | +from tests.utils.utils import ( |
| 36 | + create_mock_connections, |
| 37 | +) |
32 | 38 |
|
33 | 39 | logger = logging.getLogger("libp2p.identity.identify-push-test")
|
34 | 40 |
|
35 | 41 |
|
| 42 | +CONCURRENCY_LIMIT = 10 |
| 43 | +LIMIT = trio.Semaphore(CONCURRENCY_LIMIT) |
| 44 | +concurrency_counter = 0 |
| 45 | +max_observed = 0 |
| 46 | +lock = trio.Lock() |
| 47 | + |
| 48 | + |
36 | 49 | @pytest.mark.trio
|
37 | 50 | async def test_identify_push_protocol(security_protocol):
|
38 | 51 | """
|
@@ -204,20 +217,20 @@ async def test_identify_push_to_peers(security_protocol):
|
204 | 217 |
|
205 | 218 | # Check that the peer is in the peerstore
|
206 | 219 | assert peer_id_a in peerstore_c.peer_ids()
|
207 |
| - |
208 |
| - # Test for push_identify to only connected peers and not all peers in peerstore |
| 220 | + |
| 221 | + # Test for push_identify to only connected peers and not all peers |
209 | 222 | # Disconnect a from c.
|
210 | 223 | await host_c.disconnect(host_a.get_id())
|
211 |
| - # |
| 224 | + # |
212 | 225 | await push_identify_to_peers(host_c)
|
213 |
| - |
| 226 | + |
214 | 227 | # Wait a bit for the push to complete
|
215 | 228 | await trio.sleep(0.1)
|
216 |
| - |
217 |
| - # Check that host_a's peerstore has not been updated with host_c's information |
218 |
| - assert(host_c.get_id() not in host_a.get_peerstore().peer_ids()) |
219 |
| - # Check that host_b's peerstore has been updated with host_c's information |
220 |
| - assert(host_c.get_id() in host_b.get_peerstore().peer_ids()) |
| 229 | + |
| 230 | + # Check that host_a's peerstore has not been updated with host_c's info |
| 231 | + assert host_c.get_id() not in host_a.get_peerstore().peer_ids() |
| 232 | + # Check that host_b's peerstore has been updated with host_c's info |
| 233 | + assert host_c.get_id() in host_b.get_peerstore().peer_ids() |
221 | 234 |
|
222 | 235 |
|
223 | 236 | @pytest.mark.trio
|
@@ -427,3 +440,63 @@ async def test_partial_update_peerstore_from_identify(security_protocol):
|
427 | 440 | host_a_public_key = host_a.get_public_key().serialize()
|
428 | 441 | peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
429 | 442 | assert host_a_public_key == peerstore_public_key
|
| 443 | + |
| 444 | + |
| 445 | +@pytest.mark.trio |
| 446 | +async def test_push_identify_to_peers_respects_concurrency_limit(): |
| 447 | + """ |
| 448 | + Test bounded concurrency for the identify/push protocol to prevent |
| 449 | + network congestion. |
| 450 | +
|
| 451 | + This test verifies: |
| 452 | + 1. The number of concurrent tasks executing the identify push is always |
| 453 | + less than or equal to CONCURRENCY_LIMIT. |
| 454 | + 2. An error is raised if concurrency exceeds the defined limit. |
| 455 | +
|
| 456 | + It mocks `push_identify_to_peer` to simulate delay using sleep, |
| 457 | + allowing the test to measure and assert actual concurrency behavior. |
| 458 | + """ |
| 459 | + |
| 460 | + # Create a mock host. |
| 461 | + key_pair_host = create_new_key_pair() |
| 462 | + host = new_host(key_pair=key_pair_host) |
| 463 | + |
| 464 | + # Create a mock network and add mock connections to the host |
| 465 | + host.get_network().connections = create_mock_connections() |
| 466 | + with patch( |
| 467 | + "libp2p.identity.identify_push.identify_push.push_identify_to_peer", |
| 468 | + new=mock_push_identify_to_peer, |
| 469 | + ): |
| 470 | + await push_identify_to_peers(host) |
| 471 | + assert ( |
| 472 | + max_observed <= CONCURRENCY_LIMIT |
| 473 | + ), f"Max concurrency observed: {max_observed}" |
| 474 | + |
| 475 | + |
| 476 | +async def mock_push_identify_to_peer(host, peer_id, observed_multiaddr=None) -> bool: |
| 477 | + """ |
| 478 | + Mock function to test concurrency by simulating an identify message. |
| 479 | +
|
| 480 | + This function patches push_identify_to_peer for testing purpose |
| 481 | +
|
| 482 | + Returns |
| 483 | + ------- |
| 484 | + bool |
| 485 | + True if the push was successful, False otherwise. |
| 486 | + """ |
| 487 | + global concurrency_counter, max_observed |
| 488 | + |
| 489 | + async with LIMIT: |
| 490 | + async with lock: |
| 491 | + concurrency_counter += 1 |
| 492 | + if concurrency_counter > CONCURRENCY_LIMIT: |
| 493 | + raise RuntimeError(f"Concurrency limit exceeded: {concurrency_counter}") |
| 494 | + max_observed = max(max_observed, concurrency_counter) |
| 495 | + |
| 496 | + logger.debug("Successfully pushed identify to peer %s", peer_id) |
| 497 | + await trio.sleep(0.05) |
| 498 | + |
| 499 | + async with lock: |
| 500 | + concurrency_counter -= 1 |
| 501 | + |
| 502 | + return True |
0 commit comments