39
39
logger = logging .getLogger ("libp2p.identity.identify-push-test" )
40
40
41
41
42
- CONCURRENCY_LIMIT = 10
43
- LIMIT = trio .Semaphore (CONCURRENCY_LIMIT )
44
- concurrency_counter = 0
45
- max_observed = 0
46
- lock = trio .Lock ()
47
-
48
-
49
42
@pytest .mark .trio
50
43
async def test_identify_push_protocol (security_protocol ):
51
44
"""
@@ -221,7 +214,7 @@ async def test_identify_push_to_peers(security_protocol):
221
214
# Test for push_identify to only connected peers and not all peers
222
215
# Disconnect a from c.
223
216
await host_c .disconnect (host_a .get_id ())
224
- #
217
+
225
218
await push_identify_to_peers (host_c )
226
219
227
220
# Wait a bit for the push to complete
@@ -456,6 +449,46 @@ async def test_push_identify_to_peers_respects_concurrency_limit():
456
449
It mocks `push_identify_to_peer` to simulate delay using sleep,
457
450
allowing the test to measure and assert actual concurrency behavior.
458
451
"""
452
+ CONCURRENCY_LIMIT = 10
453
+ LIMIT = trio .Semaphore (CONCURRENCY_LIMIT )
454
+ state = {
455
+ "concurrency_counter" : 0 ,
456
+ "max_observed" : 0 ,
457
+ }
458
+ lock = trio .Lock ()
459
+
460
+ async def mock_push_identify_to_peer (
461
+ host , peer_id , observed_multiaddr = None
462
+ ) -> bool :
463
+ """
464
+ Mock function to test concurrency by simulating an identify message.
465
+
466
+ This function patches push_identify_to_peer for testing purpose
467
+
468
+ Returns
469
+ -------
470
+ bool
471
+ True if the push was successful, False otherwise.
472
+
473
+ """
474
+ async with LIMIT :
475
+ async with lock :
476
+ state ["concurrency_counter" ] += 1
477
+ if state ["concurrency_counter" ] > CONCURRENCY_LIMIT :
478
+ raise RuntimeError (
479
+ f"Concurrency limit exceeded: { state ['concurrency_counter' ]} "
480
+ )
481
+ state ["max_observed" ] = max (
482
+ state ["max_observed" ], state ["concurrency_counter" ]
483
+ )
484
+
485
+ logger .debug ("Successfully pushed identify to peer %s" , peer_id )
486
+ await trio .sleep (0.05 )
487
+
488
+ async with lock :
489
+ state ["concurrency_counter" ] -= 1
490
+
491
+ return True
459
492
460
493
# Create a mock host.
461
494
key_pair_host = create_new_key_pair ()
@@ -468,35 +501,6 @@ async def test_push_identify_to_peers_respects_concurrency_limit():
468
501
new = mock_push_identify_to_peer ,
469
502
):
470
503
await push_identify_to_peers (host )
471
- assert (
472
- max_observed <= CONCURRENCY_LIMIT
473
- ), f"Max concurrency observed: { max_observed } "
474
-
475
-
476
- async def mock_push_identify_to_peer (host , peer_id , observed_multiaddr = None ) -> bool :
477
- """
478
- Mock function to test concurrency by simulating an identify message.
479
-
480
- This function patches push_identify_to_peer for testing purpose
481
-
482
- Returns
483
- -------
484
- bool
485
- True if the push was successful, False otherwise.
486
- """
487
- global concurrency_counter , max_observed
488
-
489
- async with LIMIT :
490
- async with lock :
491
- concurrency_counter += 1
492
- if concurrency_counter > CONCURRENCY_LIMIT :
493
- raise RuntimeError (f"Concurrency limit exceeded: { concurrency_counter } " )
494
- max_observed = max (max_observed , concurrency_counter )
495
-
496
- logger .debug ("Successfully pushed identify to peer %s" , peer_id )
497
- await trio .sleep (0.05 )
498
-
499
- async with lock :
500
- concurrency_counter -= 1
501
-
502
- return True
504
+ assert state ["max_observed" ] <= CONCURRENCY_LIMIT , (
505
+ f"Max concurrency observed: { state ['max_observed' ]} "
506
+ )
0 commit comments