@@ -69,14 +69,30 @@ async def test_exchangews_ohlcv(mocker, time_machine, caplog):
6969 ccxt_object = MagicMock ()
7070 caplog .set_level (logging .DEBUG )
7171
72- async def sleeper (* args , ** kwargs ):
73- # pass
74- await asyncio .sleep (0.12 )
72+ # Create synchronization events for deterministic testing
73+ watch_call_event = asyncio .Event ()
74+ watch_call_count = 0
75+
76+ async def controlled_sleeper (* args , ** kwargs ):
77+ """Controlled async function that signals when called."""
78+ nonlocal watch_call_count
79+ watch_call_count += 1
80+ # Signal that a watch call happened
81+ watch_call_event .set ()
82+ await asyncio .sleep (0.01 ) # Minimal delay for realism
7583 return MagicMock ()
7684
77- ccxt_object .un_watch_ohlcv_for_symbols = AsyncMock (side_effect = NotSupported )
85+ async def wait_for_condition (condition_func , timeout = 5.0 , check_interval = 0.01 ):
86+ """Wait for a condition to be true with timeout."""
87+ start_time = asyncio .get_event_loop ().time ()
88+ while asyncio .get_event_loop ().time () - start_time < timeout :
89+ if condition_func ():
90+ return True
91+ await asyncio .sleep (check_interval )
92+ return False
7893
79- ccxt_object .watch_ohlcv = AsyncMock (side_effect = sleeper )
94+ ccxt_object .un_watch_ohlcv_for_symbols = AsyncMock (side_effect = NotSupported )
95+ ccxt_object .watch_ohlcv = AsyncMock (side_effect = controlled_sleeper )
8096 ccxt_object .close = AsyncMock ()
8197 time_machine .move_to ("2024-11-01 01:00:02 +00:00" )
8298
@@ -90,7 +106,14 @@ async def sleeper(*args, **kwargs):
90106
91107 exchange_ws .schedule_ohlcv ("ETH/BTC" , "1m" , CandleType .SPOT )
92108 exchange_ws .schedule_ohlcv ("XRP/BTC" , "1m" , CandleType .SPOT )
93- await asyncio .sleep (0.2 )
109+
110+ # Wait for both pairs to be properly scheduled and watching
111+ await wait_for_condition (
112+ lambda : (
113+ len (exchange_ws ._klines_watching ) == 2 and len (exchange_ws ._klines_scheduled ) == 2
114+ ),
115+ timeout = 2.0 ,
116+ )
94117
95118 assert exchange_ws ._klines_watching == {
96119 ("ETH/BTC" , "1m" , CandleType .SPOT ),
@@ -100,22 +123,31 @@ async def sleeper(*args, **kwargs):
100123 ("ETH/BTC" , "1m" , CandleType .SPOT ),
101124 ("XRP/BTC" , "1m" , CandleType .SPOT ),
102125 }
103- await asyncio .sleep (0.1 )
126+
127+ # Wait for the expected number of watch calls (should be 6 based on original test logic)
128+ await wait_for_condition (lambda : ccxt_object .watch_ohlcv .call_count >= 6 , timeout = 3.0 )
104129 assert ccxt_object .watch_ohlcv .call_count == 6
105130 ccxt_object .watch_ohlcv .reset_mock ()
106131
107132 time_machine .shift (timedelta (minutes = 5 ))
108133 exchange_ws .schedule_ohlcv ("ETH/BTC" , "1m" , CandleType .SPOT )
109- await asyncio .sleep (1 )
110- assert log_has_re ("un_watch_ohlcv_for_symbols not supported: " , caplog )
111- # XRP/BTC should be cleaned up.
112- assert exchange_ws ._klines_watching == {
113- ("ETH/BTC" , "1m" , CandleType .SPOT ),
114- }
134+
135+ # Wait for log message and state changes with timeout
136+ await wait_for_condition (
137+ lambda : log_has_re ("un_watch_ohlcv_for_symbols not supported: " , caplog ), timeout = 2.0
138+ )
139+
140+ # Wait for XRP/BTC cleanup
141+ await wait_for_condition (
142+ lambda : exchange_ws ._klines_watching == {("ETH/BTC" , "1m" , CandleType .SPOT )},
143+ timeout = 2.0 ,
144+ )
115145
116146 # Cleanup happened.
117147 ccxt_object .un_watch_ohlcv_for_symbols = AsyncMock (side_effect = ValueError )
118148 exchange_ws .schedule_ohlcv ("ETH/BTC" , "1m" , CandleType .SPOT )
149+
150+ # Verify final state
119151 assert exchange_ws ._klines_watching == {
120152 ("ETH/BTC" , "1m" , CandleType .SPOT ),
121153 }
0 commit comments