@@ -12,34 +12,43 @@ async def _instant_backoff(max_retries, base=1.0, factor=2.0, sleep=False):
1212 yield base * (factor ** attempt )
1313
1414
15+ @pytest .fixture
16+ def connection_manager (request ):
17+ """Create a ConnectionManager with mocked heavy dependencies.
18+
19+ Accepts max_join_retries via indirect parametrize, defaults to 3.
20+ """
21+ max_join_retries = getattr (request , "param" , 3 )
22+ with (
23+ patch ("getstream.video.rtc.connection_manager.PeerConnectionManager" ),
24+ patch ("getstream.video.rtc.connection_manager.NetworkMonitor" ),
25+ patch ("getstream.video.rtc.connection_manager.ReconnectionManager" ),
26+ patch ("getstream.video.rtc.connection_manager.RecordingManager" ),
27+ patch ("getstream.video.rtc.connection_manager.SubscriptionManager" ),
28+ patch ("getstream.video.rtc.connection_manager.ParticipantsState" ),
29+ patch ("getstream.video.rtc.connection_manager.Tracer" ),
30+ patch ("getstream.video.rtc.connection_manager.exp_backoff" , _instant_backoff ),
31+ ):
32+ mock_call = MagicMock ()
33+ mock_call .call_type = "default"
34+ mock_call .id = "test_call"
35+ cm = ConnectionManager (
36+ call = mock_call , user_id = "user1" , max_join_retries = max_join_retries
37+ )
38+ cm ._connect_coordinator_ws = AsyncMock ()
39+ yield cm
40+
41+
1542class TestConnectRetry :
1643 """Tests for connect() retry logic when SFU is full."""
1744
18- def _make_connection_manager (self , max_join_retries = 3 ):
19- """Create a ConnectionManager with mocked dependencies."""
20- with (
21- patch ("getstream.video.rtc.connection_manager.PeerConnectionManager" ),
22- patch ("getstream.video.rtc.connection_manager.NetworkMonitor" ),
23- patch ("getstream.video.rtc.connection_manager.ReconnectionManager" ),
24- patch ("getstream.video.rtc.connection_manager.RecordingManager" ),
25- patch ("getstream.video.rtc.connection_manager.SubscriptionManager" ),
26- patch ("getstream.video.rtc.connection_manager.ParticipantsState" ),
27- patch ("getstream.video.rtc.connection_manager.Tracer" ),
28- ):
29- mock_call = MagicMock ()
30- mock_call .call_type = "default"
31- mock_call .id = "test_call"
32- cm = ConnectionManager (
33- call = mock_call , user_id = "user1" , max_join_retries = max_join_retries
34- )
35- return cm
36-
3745 @pytest .mark .asyncio
38- @patch ("getstream.video.rtc.connection_manager.exp_backoff" , _instant_backoff )
39- async def test_retries_on_sfu_join_error_and_passes_failed_sfus (self ):
46+ @pytest .mark .parametrize ("connection_manager" , [2 ], indirect = True )
47+ async def test_retries_on_sfu_join_error_and_passes_failed_sfus (
48+ self , connection_manager
49+ ):
4050 """When SFU is full, connect() should retry with migrating_from_list."""
41- cm = self ._make_connection_manager (max_join_retries = 2 )
42-
51+ cm = connection_manager
4352 call_count = 0
4453 received_migrating_from_list = []
4554
@@ -49,7 +58,6 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs):
4958 received_migrating_from_list .append (migrating_from_list )
5059
5160 if call_count <= 2 :
52- # Simulate SFU assigning an edge_name before failing
5361 mock_join_response = MagicMock ()
5462 mock_join_response .credentials .server .edge_name = (
5563 f"sfu-node-{ call_count } "
@@ -60,27 +68,22 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs):
6068 error_code = models_pb2 .ERROR_CODE_SFU_FULL ,
6169 should_retry = True ,
6270 )
63- # Third attempt succeeds
6471 cm .running = True
6572
6673 cm ._connect_internal = mock_connect_internal
67- cm ._connect_coordinator_ws = AsyncMock ()
6874
6975 await cm .connect ()
7076
7177 assert call_count == 3
72- # First attempt: no failed SFUs
7378 assert received_migrating_from_list [0 ] is None
74- # Second attempt: first SFU in the exclude list
7579 assert "sfu-node-1" in received_migrating_from_list [1 ]
76- # Third attempt: both SFUs in the exclude list
7780 assert received_migrating_from_list [2 ] == ["sfu-node-1" , "sfu-node-2" ]
7881
7982 @pytest .mark .asyncio
80- @patch ( "getstream.video.rtc. connection_manager.exp_backoff " , _instant_backoff )
81- async def test_raises_after_all_retries_exhausted (self ):
83+ @pytest . mark . parametrize ( " connection_manager" , [ 1 ], indirect = True )
84+ async def test_raises_after_all_retries_exhausted (self , connection_manager ):
8285 """When all retries are exhausted, connect() should raise SfuJoinError."""
83- cm = self . _make_connection_manager ( max_join_retries = 1 )
86+ cm = connection_manager
8487
8588 async def always_fail (migrating_from_list = None , ** kwargs ):
8689 mock_join_response = MagicMock ()
@@ -93,16 +96,14 @@ async def always_fail(migrating_from_list=None, **kwargs):
9396 )
9497
9598 cm ._connect_internal = always_fail
96- cm ._connect_coordinator_ws = AsyncMock ()
9799
98100 with pytest .raises (SfuJoinError ):
99101 await cm .connect ()
100102
101103 @pytest .mark .asyncio
102- async def test_non_retryable_error_propagates_immediately (self ):
104+ async def test_non_retryable_error_propagates_immediately (self , connection_manager ):
103105 """Non-retryable errors should not trigger retry."""
104- cm = self ._make_connection_manager (max_join_retries = 3 )
105-
106+ cm = connection_manager
106107 call_count = 0
107108
108109 async def fail_with_generic_error (migrating_from_list = None , ** kwargs ):
@@ -111,27 +112,23 @@ async def fail_with_generic_error(migrating_from_list=None, **kwargs):
111112 raise SfuConnectionError ("something went wrong" )
112113
113114 cm ._connect_internal = fail_with_generic_error
114- cm ._connect_coordinator_ws = AsyncMock ()
115115
116116 with pytest .raises (SfuConnectionError ):
117117 await cm .connect ()
118118
119- # Should not retry — only one call
120119 assert call_count == 1
121120
122121 @pytest .mark .asyncio
123- @patch ( "getstream.video.rtc. connection_manager.exp_backoff " , _instant_backoff )
124- async def test_cleans_up_ws_client_between_retries (self ):
122+ @pytest . mark . parametrize ( " connection_manager" , [ 1 ], indirect = True )
123+ async def test_cleans_up_ws_client_between_retries (self , connection_manager ):
125124 """Partial WS state should be cleaned up before retry."""
126- cm = self ._make_connection_manager (max_join_retries = 1 )
127-
125+ cm = connection_manager
128126 call_count = 0
129127
130128 async def mock_connect_internal (migrating_from_list = None , ** kwargs ):
131129 nonlocal call_count
132130 call_count += 1
133131 if call_count == 1 :
134- # Simulate partial WS connection
135132 cm ._ws_client = MagicMock ()
136133 mock_join_response = MagicMock ()
137134 mock_join_response .credentials .server .edge_name = "sfu-node-1"
@@ -141,11 +138,9 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs):
141138 error_code = models_pb2 .ERROR_CODE_SFU_FULL ,
142139 should_retry = True ,
143140 )
144- # Second attempt: ws_client should have been cleaned up
145141 cm .running = True
146142
147143 cm ._connect_internal = mock_connect_internal
148- cm ._connect_coordinator_ws = AsyncMock ()
149144
150145 await cm .connect ()
151146
0 commit comments