@@ -37,7 +37,8 @@ all() ->
3737groups () ->
3838 [{cluster , [],
3939 [simple_sac_consumer_should_get_disconnected_on_network_partition ,
40- super_stream_sac_consumer_should_get_disconnected_on_network_partition ]}
40+ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition ,
41+ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition ]}
4142 ].
4243
4344init_per_suite (Config ) ->
@@ -80,7 +81,8 @@ init_per_testcase(TestCase, Config) ->
8081 fun (StepConfig ) ->
8182 rabbit_ct_helpers :merge_app_env (StepConfig ,
8283 {rabbit ,
83- [{stream_sac_disconnected_timeout ,
84+ [{stream_cmd_timeout , 5000 },
85+ {stream_sac_disconnected_timeout ,
8486 2000 }]})
8587 end ]
8688 ++ rabbit_ct_broker_helpers :setup_steps ()).
@@ -97,8 +99,7 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
9799 {ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
98100 {ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
99101
100- create_stream (Config , S ),
101- wait_for_members (So0 , C0_00 , S , 3 ),
102+ init_stream (Config , 0 , S ),
102103
103104 C0_01 = register_sac (So0 , C0_00 , S , 0 ),
104105 C0_02 = receive_consumer_update (So0 , C0_01 ),
@@ -152,7 +153,7 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
152153 Acc #{K => {S0 , C0 }}
153154 end , #{}, SubIdToState0 ),
154155
155- delete_stream (Config , S ),
156+ delete_stream (stream_port ( Config , 0 ) , S ),
156157
157158 % % online consumers should receive a metadata update frame (stream deleted)
158159 % % we unqueue the this frame before closing the connection
@@ -166,7 +167,104 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
166167
167168 ok .
168169
169- super_stream_sac_consumer_should_get_disconnected_on_network_partition (Config ) ->
170+ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition (Config ) ->
171+ % % to initialize the stream coordinator on the first node
172+ init_stream (Config , 0 , <<" dummy" >>),
173+ delete_stream (stream_port (Config , 0 ), <<" dummy" >>),
174+
175+ S = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
176+
177+ init_stream (Config , 1 , S ),
178+ [L , _F1 , _F2 ] = topology (Config , S ),
179+
180+ wait_for_coordinator_ready (Config ),
181+
182+ {ok , So0 , C0_00 } = stream_test_utils :connect (Config , 0 ),
183+ {ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
184+ {ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
185+
186+ C0_01 = register_sac (So0 , C0_00 , S , 0 ),
187+ C0_02 = receive_consumer_update (So0 , C0_01 ),
188+
189+ C1_01 = register_sac (So1 , C1_00 , S , 1 ),
190+ C2_01 = register_sac (So2 , C2_00 , S , 2 ),
191+ SubIdToState0 = #{0 => {So0 , C0_02 },
192+ 1 => {So1 , C1_01 },
193+ 2 => {So2 , C2_01 }},
194+
195+ Consumers1 = query_consumers (Config , S ),
196+ assertSize (3 , Consumers1 ),
197+ assertConsumersConnected (Consumers1 ),
198+
199+ N1 = nodename (Config , 0 ),
200+ N2 = nodename (Config , 1 ),
201+ N3 = nodename (Config , 2 ),
202+
203+ % % N1 is the coordinator leader
204+ Isolated = N1 ,
205+ NotIsolated = N2 ,
206+ {value , DisconnectedConsumer } =
207+ lists :search (fun (# consumer {pid = ConnPid }) ->
208+ rpc (Config , erlang , node , [ConnPid ]) =:= Isolated
209+ end , Consumers1 ),
210+ # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
211+
212+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , N2 ),
213+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , N3 ),
214+
215+ wait_for_disconnected_consumer (Config , NotIsolated , S ),
216+ wait_for_forgotten_consumer (Config , NotIsolated , S ),
217+
218+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , N2 ),
219+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , N3 ),
220+
221+ wait_for_coordinator_ready (Config ),
222+
223+ wait_for_all_consumers_connected (Config , NotIsolated , S ),
224+
225+ Consumers2 = query_consumers (Config , NotIsolated , S ),
226+
227+ % % the disconnected, then forgotten consumer is cancelled,
228+ % % because the stream member on its node has been restarted
229+ assertSize (2 , Consumers2 ),
230+ assertConsumersConnected (Consumers2 ),
231+ assertEmpty (lists :filter (fun (C ) ->
232+ same_consumer (DisconnectedConsumer , C )
233+ end , Consumers2 )),
234+
235+ [# consumer {subscription_id = ActiveSubId }] =
236+ lists :filter (fun (# consumer {status = St }) ->
237+ St =:= {connected , active }
238+ end , Consumers2 ),
239+
240+ SubIdToState1 =
241+ maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
242+ % % cancelled consumer received a metadata update
243+ C1 = receive_metadata_update (S0 , C0 ),
244+ Acc #{K => {S0 , C1 }};
245+ (K , {S0 , C0 }, Acc ) when K == ActiveSubId ->
246+ % % promoted consumer should have received consumer update
247+ C1 = receive_consumer_update_and_respond (S0 , C0 ),
248+ Acc #{K => {S0 , C1 }};
249+ (K , {S0 , C0 }, Acc ) ->
250+ Acc #{K => {S0 , C0 }}
251+ end , #{}, SubIdToState0 ),
252+
253+ delete_stream (L # node .stream_port , S ),
254+
255+ % % online consumers should receive a metadata update frame (stream deleted)
256+ % % we unqueue this frame before closing the connection
257+ % % directly closing the connection of the cancelled consumer
258+ maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
259+ {_ , C1 } = receive_commands (S0 , C0 ),
260+ {ok , _ } = stream_test_utils :close (S0 , C1 );
261+ (_ , {S0 , C0 }) ->
262+ {ok , _ } = stream_test_utils :close (S0 , C0 )
263+ end , SubIdToState1 ),
264+
265+ ok .
266+
267+ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition (Config ) ->
170268 Ss = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
171269 [_ , Partition , _ ] = init_super_stream (Config , Ss ),
172270 [L , F1 , F2 ] = topology (Config , Partition ),
@@ -250,7 +348,7 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
250348 Acc #{K => {S0 , C0 }}
251349 end , #{}, SubIdToState0 ),
252350
253- delete_super_stream (Config , Ss , L # node .stream_port ),
351+ delete_super_stream (L # node .stream_port , Ss ),
254352
255353 % % online consumers should receive a metadata update frame (stream deleted)
256354 % % we unqueue this frame before closing the connection
@@ -261,7 +359,6 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
261359 (_ , {S0 , C0 }) ->
262360 {ok , _ } = stream_test_utils :close (S0 , C0 )
263361 end , SubIdToState1 ),
264-
265362 ok .
266363
267364same_consumer (# consumer {owner = P1 , subscription_id = Id1 },
@@ -274,12 +371,18 @@ same_consumer(_, _) ->
274371cluster_nodes (Config ) ->
275372 lists :map (fun (N ) ->
276373 # node {name = node_config (Config , N , nodename ),
277- stream_port = node_config (Config , N , tcp_port_stream )}
374+ stream_port = stream_port (Config , N )}
278375 end , lists :seq (0 , node_count (Config ) - 1 )).
279376
280377node_count (Config ) ->
281378 test_server :lookup_config (rmq_nodes_count , Config ).
282379
380+ nodename (Config , N ) ->
381+ node_config (Config , N , nodename ).
382+
383+ stream_port (Config , N ) ->
384+ node_config (Config , N , tcp_port_stream ).
385+
283386node_config (Config , N , K ) ->
284387 rabbit_ct_broker_helpers :get_node_config (Config , N , K ).
285388
@@ -318,13 +421,15 @@ stream_members(Config, Stream) ->
318421 [StreamId , State ]),
319422 Members .
320423
321- create_stream (Config , St ) ->
322- {ok , S , C0 } = stream_test_utils :connect (Config , 0 ),
424+ init_stream (Config , Node , St ) ->
425+ {ok , S , C0 } = stream_test_utils :connect (Config , Node ),
323426 {ok , C1 } = stream_test_utils :create_stream (S , C0 , St ),
427+ NC = node_count (Config ),
428+ wait_for_members (S , C1 , St , NC ),
324429 {ok , _ } = stream_test_utils :close (S , C1 ).
325430
326- delete_stream (Config , St ) ->
327- {ok , S , C0 } = stream_test_utils :connect (Config , 0 ),
431+ delete_stream (Port , St ) ->
432+ {ok , S , C0 } = stream_test_utils :connect (Port ),
328433 {ok , C1 } = stream_test_utils :delete_stream (S , C0 , St ),
329434 {ok , _ } = stream_test_utils :close (S , C1 ).
330435
@@ -344,7 +449,7 @@ init_super_stream(Config, Ss) ->
344449 {ok , _ } = stream_test_utils :close (S , C1 ),
345450 Partitions .
346451
347- delete_super_stream (Config , Ss , Port ) ->
452+ delete_super_stream (Port , Ss ) ->
348453 {ok , S , C0 } = stream_test_utils :connect (Port ),
349454 SsDeletionFrame = request ({delete_super_stream , Ss }),
350455 ok = ? TRSPT :send (S , SsDeletionFrame ),
0 commit comments