|
1 | 1 | import logging
|
| 2 | +from unittest.mock import ( |
| 3 | + patch, |
| 4 | +) |
2 | 5 |
|
3 | 6 | import pytest
|
4 | 7 | import multiaddr
|
|
17 | 20 | Identify,
|
18 | 21 | )
|
19 | 22 | from libp2p.identity.identify_push.identify_push import (
|
| 23 | + CONCURRENCY_LIMIT, |
20 | 24 | ID_PUSH,
|
21 | 25 | _update_peerstore_from_identify,
|
22 | 26 | identify_push_handler_for,
|
|
29 | 33 | from tests.utils.factories import (
|
30 | 34 | host_pair_factory,
|
31 | 35 | )
|
| 36 | +from tests.utils.utils import ( |
| 37 | + create_mock_connections, |
| 38 | +) |
32 | 39 |
|
33 | 40 | logger = logging.getLogger("libp2p.identity.identify-push-test")
|
34 | 41 |
|
@@ -175,6 +182,7 @@ async def test_identify_push_to_peers(security_protocol):
|
175 | 182 | host_c = new_host(key_pair=key_pair_c)
|
176 | 183 |
|
177 | 184 | # Set up the identify/push handlers
|
| 185 | + host_a.set_stream_handler(ID_PUSH, identify_push_handler_for(host_a)) |
178 | 186 | host_b.set_stream_handler(ID_PUSH, identify_push_handler_for(host_b))
|
179 | 187 | host_c.set_stream_handler(ID_PUSH, identify_push_handler_for(host_c))
|
180 | 188 |
|
@@ -204,6 +212,20 @@ async def test_identify_push_to_peers(security_protocol):
|
204 | 212 | # Check that the peer is in the peerstore
|
205 | 213 | assert peer_id_a in peerstore_c.peer_ids()
|
206 | 214 |
|
| 215 | + # Test for push_identify to only connected peers and not all peers |
| 216 | + # Disconnect a from c. |
| 217 | + await host_c.disconnect(host_a.get_id()) |
| 218 | + |
| 219 | + await push_identify_to_peers(host_c) |
| 220 | + |
| 221 | + # Wait a bit for the push to complete |
| 222 | + await trio.sleep(0.1) |
| 223 | + |
| 224 | + # Check that host_a's peerstore has not been updated with host_c's info |
| 225 | + assert host_c.get_id() not in host_a.get_peerstore().peer_ids() |
| 226 | + # Check that host_b's peerstore has been updated with host_c's info |
| 227 | + assert host_c.get_id() in host_b.get_peerstore().peer_ids() |
| 228 | + |
207 | 229 |
|
208 | 230 | @pytest.mark.trio
|
209 | 231 | async def test_push_identify_to_peers_with_explicit_params(security_protocol):
|
@@ -412,3 +434,72 @@ async def test_partial_update_peerstore_from_identify(security_protocol):
|
412 | 434 | host_a_public_key = host_a.get_public_key().serialize()
|
413 | 435 | peerstore_public_key = peerstore.pubkey(peer_id).serialize()
|
414 | 436 | assert host_a_public_key == peerstore_public_key
|
| 437 | + |
| 438 | + |
| 439 | +@pytest.mark.trio |
| 440 | +async def test_push_identify_to_peers_respects_concurrency_limit(): |
| 441 | + """ |
| 442 | + Test bounded concurrency for the identify/push protocol to prevent |
| 443 | + network congestion. |
| 444 | +
|
| 445 | + This test verifies: |
| 446 | + 1. The number of concurrent tasks executing the identify push is always |
| 447 | + less than or equal to CONCURRENCY_LIMIT. |
| 448 | + 2. An error is raised if concurrency exceeds the defined limit. |
| 449 | +
|
| 450 | + It mocks `push_identify_to_peer` to simulate delay using sleep, |
| 451 | + allowing the test to measure and assert actual concurrency behavior. |
| 452 | + """ |
| 453 | + state = { |
| 454 | + "concurrency_counter": 0, |
| 455 | + "max_observed": 0, |
| 456 | + } |
| 457 | + lock = trio.Lock() |
| 458 | + |
| 459 | + async def mock_push_identify_to_peer( |
| 460 | + host, peer_id, observed_multiaddr=None, limit=trio.Semaphore(CONCURRENCY_LIMIT) |
| 461 | + ) -> bool: |
| 462 | + """ |
| 463 | + Mock function to test concurrency by simulating an identify message. |
| 464 | +
|
| 465 | + This function patches push_identify_to_peer for testing purpose |
| 466 | +
|
| 467 | + Returns |
| 468 | + ------- |
| 469 | + bool |
| 470 | + True if the push was successful, False otherwise. |
| 471 | +
|
| 472 | + """ |
| 473 | + async with limit: |
| 474 | + async with lock: |
| 475 | + state["concurrency_counter"] += 1 |
| 476 | + if state["concurrency_counter"] > CONCURRENCY_LIMIT: |
| 477 | + raise RuntimeError( |
| 478 | + f"Concurrency limit exceeded: {state['concurrency_counter']}" |
| 479 | + ) |
| 480 | + state["max_observed"] = max( |
| 481 | + state["max_observed"], state["concurrency_counter"] |
| 482 | + ) |
| 483 | + |
| 484 | + logger.debug("Successfully pushed identify to peer %s", peer_id) |
| 485 | + await trio.sleep(0.05) |
| 486 | + |
| 487 | + async with lock: |
| 488 | + state["concurrency_counter"] -= 1 |
| 489 | + |
| 490 | + return True |
| 491 | + |
| 492 | + # Create a mock host. |
| 493 | + key_pair_host = create_new_key_pair() |
| 494 | + host = new_host(key_pair=key_pair_host) |
| 495 | + |
| 496 | + # Create a mock network and add mock connections to the host |
| 497 | + host.get_network().connections = create_mock_connections() |
| 498 | + with patch( |
| 499 | + "libp2p.identity.identify_push.identify_push.push_identify_to_peer", |
| 500 | + new=mock_push_identify_to_peer, |
| 501 | + ): |
| 502 | + await push_identify_to_peers(host) |
| 503 | + assert state["max_observed"] <= CONCURRENCY_LIMIT, ( |
| 504 | + f"Max concurrency observed: {state['max_observed']}" |
| 505 | + ) |
0 commit comments