1+ from collections .abc import (
2+ Callable ,
3+ )
4+
15import pytest
26import trio
37
913)
1014
1115
16+ async def wait_for_convergence (
17+ nodes : tuple [DummyAccountNode , ...],
18+ check : Callable [[DummyAccountNode ], bool ],
19+ timeout : float = 10.0 ,
20+ poll_interval : float = 0.02 ,
21+ log_success : bool = False ,
22+ raise_last_exception_on_timeout : bool = True ,
23+ ) -> None :
24+ """
25+ Wait until all nodes satisfy the check condition.
26+
27+ Returns as soon as convergence is reached, otherwise raises TimeoutError.
28+ Convergence already guarantees all nodes satisfy the check, so callers need
29+ not run a second assertion pass after this returns.
30+ """
31+ start_time = trio .current_time ()
32+
33+ last_exception : Exception | None = None
34+ last_exception_node : int | None = None
35+
36+ while True :
37+ failed_indices : list [int ] = []
38+ for i , node in enumerate (nodes ):
39+ try :
40+ ok = check (node )
41+ except Exception as exc :
42+ ok = False
43+ last_exception = exc
44+ last_exception_node = i
45+ if not ok :
46+ failed_indices .append (i )
47+
48+ if not failed_indices :
49+ elapsed = trio .current_time () - start_time
50+ if log_success :
51+ print (f"✓ Converged in { elapsed :.3f} s with { len (nodes )} nodes" )
52+ return
53+
54+ elapsed = trio .current_time () - start_time
55+ if elapsed > timeout :
56+ if raise_last_exception_on_timeout and last_exception is not None :
57+ # Preserve the underlying assertion/exception signal (and its message)
58+ # instead of hiding it behind a generic timeout.
59+ node_hint = (
60+ f" (node index { last_exception_node } )"
61+ if last_exception_node is not None
62+ else ""
63+ )
64+ raise AssertionError (
65+ f"Convergence failed{ node_hint } : { last_exception } "
66+ ) from last_exception
67+
68+ raise TimeoutError (
69+ f"Convergence timeout after { elapsed :.2f} s. "
70+ f"Failed nodes: { failed_indices } . "
71+ f"(Hint: run with -s and pass log_success=True for timing logs)"
72+ )
73+
74+ await trio .sleep (poll_interval )
75+
76+
1277async def perform_test (num_nodes , adjacency_map , action_func , assertion_func ):
1378 """
1479 Helper function to allow for easy construction of custom tests for dummy
@@ -38,12 +103,12 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
38103 # Perform action function
39104 await action_func (dummy_nodes )
40105
41- # Allow time for action function to be performed (i.e. messages to propogate)
42- await trio .sleep (1 )
106+ # Wait until all nodes satisfy the expected final state.
107+ def _check_final (node : DummyAccountNode ) -> bool :
108+ assertion_func (node )
109+ return True
43110
44- # Perform assertion function
45- for dummy_node in dummy_nodes :
46- assertion_func (dummy_node )
111+ await wait_for_convergence (dummy_nodes , _check_final , timeout = 10.0 )
47112
48113 # Success, terminate pending tasks.
49114
@@ -111,8 +176,16 @@ async def test_set_then_send_from_root_seven_nodes_tree_topography():
111176
112177 async def action_func (dummy_nodes ):
113178 await dummy_nodes [0 ].publish_set_crypto ("aspyn" , 20 )
114- await trio .sleep (0.5 ) # Increased for better tree propagation
179+ await wait_for_convergence (
180+ dummy_nodes , lambda n : n .get_balance ("aspyn" ) == 20 , timeout = 10.0
181+ )
115182 await dummy_nodes [0 ].publish_send_crypto ("aspyn" , "alex" , 5 )
183+ # Wait for the send operation to propagate to all nodes
184+ await wait_for_convergence (
185+ dummy_nodes ,
186+ lambda n : n .get_balance ("aspyn" ) == 15 and n .get_balance ("alex" ) == 5 ,
187+ timeout = 10.0 ,
188+ )
116189
117190 def assertion_func (dummy_node ):
118191 assert dummy_node .get_balance ("aspyn" ) == 15
@@ -128,7 +201,9 @@ async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography():
128201
129202 async def action_func (dummy_nodes ):
130203 await dummy_nodes [6 ].publish_set_crypto ("aspyn" , 20 )
131- await trio .sleep (0.5 ) # Increased for better tree propagation
204+ await wait_for_convergence (
205+ dummy_nodes , lambda n : n .get_balance ("aspyn" ) == 20 , timeout = 10.0
206+ )
132207 await dummy_nodes [4 ].publish_send_crypto ("aspyn" , "alex" , 5 )
133208
134209 def assertion_func (dummy_node ):
@@ -159,7 +234,12 @@ async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography():
159234
160235 async def action_func (dummy_nodes ):
161236 await dummy_nodes [0 ].publish_set_crypto ("alex" , 20 )
162- await trio .sleep (1.0 ) # Increased from 0.25 to allow proper ring propagation
237+ # Ensure `set` has reached all nodes before sending, otherwise late `set`
238+ # can overwrite the effects of `send` on nodes
239+ # that receive messages out-of-order.
240+ await wait_for_convergence (
241+ dummy_nodes , lambda n : n .get_balance ("alex" ) == 20 , timeout = 10.0
242+ )
163243 await dummy_nodes [3 ].publish_send_crypto ("alex" , "rob" , 12 )
164244
165245 def assertion_func (dummy_node ):
@@ -177,13 +257,32 @@ async def test_set_then_send_from_five_diff_nodes_five_nodes_ring_topography():
177257
178258 async def action_func (dummy_nodes ):
179259 await dummy_nodes [0 ].publish_set_crypto ("alex" , 20 )
180- await trio .sleep (1 )
260+ await wait_for_convergence (
261+ dummy_nodes , lambda n : n .get_balance ("alex" ) == 20 , timeout = 10.0
262+ )
181263 await dummy_nodes [1 ].publish_send_crypto ("alex" , "rob" , 3 )
182- await trio .sleep (1 )
264+ await wait_for_convergence (
265+ dummy_nodes ,
266+ lambda n : n .get_balance ("alex" ) == 17 and n .get_balance ("rob" ) == 3 ,
267+ timeout = 10.0 ,
268+ )
183269 await dummy_nodes [2 ].publish_send_crypto ("rob" , "aspyn" , 2 )
184- await trio .sleep (1 )
270+ await wait_for_convergence (
271+ dummy_nodes ,
272+ lambda n : n .get_balance ("alex" ) == 17
273+ and n .get_balance ("rob" ) == 1
274+ and n .get_balance ("aspyn" ) == 2 ,
275+ timeout = 10.0 ,
276+ )
185277 await dummy_nodes [3 ].publish_send_crypto ("aspyn" , "zx" , 1 )
186- await trio .sleep (1 )
278+ await wait_for_convergence (
279+ dummy_nodes ,
280+ lambda n : n .get_balance ("alex" ) == 17
281+ and n .get_balance ("rob" ) == 1
282+ and n .get_balance ("aspyn" ) == 1
283+ and n .get_balance ("zx" ) == 1 ,
284+ timeout = 10.0 ,
285+ )
187286 await dummy_nodes [4 ].publish_send_crypto ("zx" , "raul" , 1 )
188287
189288 def assertion_func (dummy_node ):
0 commit comments