1111% % The Original Code is RabbitMQ.
1212% %
1313% % The Initial Developer of the Original Code is Pivotal Software, Inc.
14- % % Copyright (c) 2020- 2025 Broadcom. All Rights Reserved.
14+ % % Copyright (c) 2025 Broadcom. All Rights Reserved.
1515% % The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
1616% %
1717
@@ -38,6 +38,7 @@ groups() ->
3838 [{cluster , [],
3939 [simple_sac_consumer_should_get_disconnected_on_network_partition ,
4040 simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition ,
41+ super_stream_sac_consumer_should_get_disconnected_on_network_partition ,
4142 super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition ]}
4243 ].
4344
@@ -94,13 +95,22 @@ end_per_testcase(TestCase, Config) ->
9495
9596
9697simple_sac_consumer_should_get_disconnected_on_network_partition (Config ) ->
98+ init_coordinator (Config ),
99+ CL = coordinator_leader (Config ),
100+
97101 S = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
102+ init_stream (Config , CL , S ),
103+
104+ [L , F1 , F2 ] = topology (Config , S ),
105+
106+ % % the stream leader and the coordinator leader are on the same node
107+ % % another node will be isolated
108+ ? assertEqual (L # node .name , coordinator_leader (Config )),
109+
98110 {ok , So0 , C0_00 } = stream_test_utils :connect (Config , 0 ),
99111 {ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
100112 {ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
101113
102- init_stream (Config , 0 , S ),
103-
104114 C0_01 = register_sac (So0 , C0_00 , S , 0 ),
105115 C0_02 = receive_consumer_update (So0 , C0_01 ),
106116
@@ -110,33 +120,33 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
110120 1 => {So1 , C1_01 },
111121 2 => {So2 , C2_01 }},
112122
113- Members = stream_members (Config , S ),
114- L = leader (Members ),
115- [F1 , F2 ] = followers (Members ),
116-
117123 Consumers1 = query_consumers (Config , S ),
118124 assertSize (3 , Consumers1 ),
119125 assertConsumersConnected (Consumers1 ),
120126
121- Isolated = F1 ,
127+ LN = L # node .name ,
128+ F1N = F1 # node .name ,
129+ F2N = F2 # node .name ,
130+
131+ Isolated = F1N ,
122132 {value , DisconnectedConsumer } =
123133 lists :search (fun (# consumer {pid = ConnPid }) ->
124134 rpc (Config , erlang , node , [ConnPid ]) =:= Isolated
125135 end , Consumers1 ),
126136 # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
127137
128- rabbit_ct_broker_helpers :block_traffic_between (Isolated , L ),
129- rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2 ),
138+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
139+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
130140
131- wait_for_disconnected_consumer (Config , L , S ),
132- wait_for_forgotten_consumer (Config , L , S ),
141+ wait_for_disconnected_consumer (Config , LN , S ),
142+ wait_for_forgotten_consumer (Config , LN , S ),
133143
134- rabbit_ct_broker_helpers :allow_traffic_between (Isolated , L ),
135- rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2 ),
144+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
145+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
136146
137- wait_for_all_consumers_connected (Config , L , S ),
147+ wait_for_all_consumers_connected (Config , LN , S ),
138148
139- Consumers2 = query_consumers (Config , L , S ),
149+ Consumers2 = query_consumers (Config , LN , S ),
140150 % % the disconnected, then forgotten consumer is cancelled,
141151 % % because the stream member on its node has been restarted
142152 assertSize (2 , Consumers2 ),
@@ -168,20 +178,21 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
168178 ok .
169179
170180simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition (Config ) ->
171- % % to initialize the stream coordinator on the first node
172- init_stream ( Config , 0 , << " dummy " >> ),
173- delete_stream ( stream_port ( Config , 0 ), << " dummy " >>) ,
181+ init_coordinator ( Config ),
182+ CL = coordinator_leader ( Config ),
183+ [ CF1 , CF2 ] = all_nodes ( Config ) -- [ CL ] ,
174184
175185 S = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
176-
177- init_stream (Config , 1 , S ),
186+ init_stream (Config , CF1 , S ),
178187 [L , _F1 , _F2 ] = topology (Config , S ),
179188
180- wait_for_coordinator_ready (Config ),
189+ % % the stream leader and the coordinator leader are not on the same node
190+ % % the coordinator leader node will be isolated
191+ ? assertNotEqual (L # node .name , CL ),
181192
182- {ok , So0 , C0_00 } = stream_test_utils :connect (Config , 0 ),
183- {ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
184- {ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
193+ {ok , So0 , C0_00 } = stream_test_utils :connect (Config , CL ),
194+ {ok , So1 , C1_00 } = stream_test_utils :connect (Config , CF1 ),
195+ {ok , So2 , C2_00 } = stream_test_utils :connect (Config , CF2 ),
185196
186197 C0_01 = register_sac (So0 , C0_00 , S , 0 ),
187198 C0_02 = receive_consumer_update (So0 , C0_01 ),
@@ -196,27 +207,23 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
196207 assertSize (3 , Consumers1 ),
197208 assertConsumersConnected (Consumers1 ),
198209
199- N1 = nodename (Config , 0 ),
200- N2 = nodename (Config , 1 ),
201- N3 = nodename (Config , 2 ),
202-
203210 % % N1 is the coordinator leader
204- Isolated = N1 ,
205- NotIsolated = N2 ,
211+ Isolated = CL ,
212+ NotIsolated = CF1 ,
206213 {value , DisconnectedConsumer } =
207214 lists :search (fun (# consumer {pid = ConnPid }) ->
208215 rpc (Config , erlang , node , [ConnPid ]) =:= Isolated
209216 end , Consumers1 ),
210217 # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
211218
212- rabbit_ct_broker_helpers :block_traffic_between (Isolated , N2 ),
213- rabbit_ct_broker_helpers :block_traffic_between (Isolated , N3 ),
219+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , CF1 ),
220+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , CF2 ),
214221
215222 wait_for_disconnected_consumer (Config , NotIsolated , S ),
216223 wait_for_forgotten_consumer (Config , NotIsolated , S ),
217224
218- rabbit_ct_broker_helpers :allow_traffic_between (Isolated , N2 ),
219- rabbit_ct_broker_helpers :allow_traffic_between (Isolated , N3 ),
225+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , CF1 ),
226+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , CF2 ),
220227
221228 wait_for_coordinator_ready (Config ),
222229
@@ -264,13 +271,107 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
264271
265272 ok .
266273
274+ super_stream_sac_consumer_should_get_disconnected_on_network_partition (Config ) ->
275+ init_coordinator (Config ),
276+ CL = coordinator_leader (Config ),
277+
278+ Ss = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
279+ [Partition , _ , _ ] = init_super_stream (Config , CL , Ss ),
280+ [L , F1 , F2 ] = topology (Config , Partition ),
281+
282+ wait_for_coordinator_ready (Config ),
283+
284+ % % we expect the stream leader and the coordinator leader to be on the same node
285+ % % another node will be isolated
286+ ? assertEqual (L # node .name , coordinator_leader (Config )),
287+
288+ {ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
289+ {ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
290+ {ok , So2 , C2_00 } = stream_test_utils :connect (F2 # node .stream_port ),
291+
292+ C0_01 = register_sac (So0 , C0_00 , Partition , 0 , Ss ),
293+ C0_02 = receive_consumer_update (So0 , C0_01 ),
294+
295+ C1_01 = register_sac (So1 , C1_00 , Partition , 1 , Ss ),
296+ C2_01 = register_sac (So2 , C2_00 , Partition , 2 , Ss ),
297+ SubIdToState0 = #{0 => {So0 , C0_02 },
298+ 1 => {So1 , C1_01 },
299+ 2 => {So2 , C2_01 }},
300+
301+ Consumers1 = query_consumers (Config , Partition ),
302+ assertSize (3 , Consumers1 ),
303+ assertConsumersConnected (Consumers1 ),
304+
305+ LN = L # node .name ,
306+ F1N = F1 # node .name ,
307+ F2N = F2 # node .name ,
308+
309+ Isolated = F1N ,
310+ NotIsolated = F2N ,
311+ {value , DisconnectedConsumer } =
312+ lists :search (fun (# consumer {pid = ConnPid }) ->
313+ rpc (Config , erlang , node , [ConnPid ]) =:= Isolated
314+ end , Consumers1 ),
315+ # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
316+
317+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
318+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
319+
320+ wait_for_disconnected_consumer (Config , NotIsolated , Partition ),
321+ wait_for_forgotten_consumer (Config , NotIsolated , Partition ),
322+
323+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
324+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
325+
326+ wait_for_coordinator_ready (Config ),
327+
328+ wait_for_all_consumers_connected (Config , NotIsolated , Partition ),
329+
330+ Consumers2 = query_consumers (Config , NotIsolated , Partition ),
331+
332+ % % the disconnected, then forgotten consumer is cancelled,
333+ % % because the stream member on its node has been restarted
334+ assertSize (2 , Consumers2 ),
335+ assertConsumersConnected (Consumers2 ),
336+ assertEmpty (lists :filter (fun (C ) ->
337+ same_consumer (DisconnectedConsumer , C )
338+ end , Consumers2 )),
339+
340+ SubIdToState1 =
341+ maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
342+ % % cancelled consumer received a metadata update
343+ C1 = receive_metadata_update (S0 , C0 ),
344+ Acc #{K => {S0 , C1 }};
345+ (K , {S0 , C0 }, Acc ) ->
346+ Acc #{K => {S0 , C0 }}
347+ end , #{}, SubIdToState0 ),
348+
349+ delete_super_stream (L # node .stream_port , Ss ),
350+
351+ % % online consumers should receive a metadata update frame (stream deleted)
352+ % % we unqueue this frame before closing the connection
353+ % % directly closing the connection of the cancelled consumer
354+ maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
355+ {_ , C1 } = receive_commands (S0 , C0 ),
356+ {ok , _ } = stream_test_utils :close (S0 , C1 );
357+ (_ , {S0 , C0 }) ->
358+ {ok , _ } = stream_test_utils :close (S0 , C0 )
359+ end , SubIdToState1 ),
360+ ok .
361+
267362super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition (Config ) ->
363+ init_coordinator (Config ),
364+ CL = coordinator_leader (Config ),
268365 Ss = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
269- [_ , Partition , _ ] = init_super_stream (Config , Ss ),
366+ [_ , Partition , _ ] = init_super_stream (Config , CL , Ss ),
270367 [L , F1 , F2 ] = topology (Config , Partition ),
271368
272369 wait_for_coordinator_ready (Config ),
273370
371+ % % check stream leader and coordinator are not on the same node
372+ % % the coordinator leader node will be isolated
373+ ? assertNotEqual (L # node .name , coordinator_leader (Config )),
374+
274375 {ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
275376 {ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
276377 {ok , So2 , C2_00 } = stream_test_utils :connect (F2 # node .stream_port ),
@@ -406,13 +507,6 @@ leader(Members) ->
406507 Acc
407508 end , undefined , Members ).
408509
409- followers (Members ) ->
410- maps :fold (fun (Node , {_ , replica }, Acc ) ->
411- [Node | Acc ];
412- (_ , _ , Acc ) ->
413- Acc
414- end , [], Members ).
415-
416510stream_members (Config , Stream ) ->
417511 {ok , Q } = rpc (Config , rabbit_amqqueue , lookup , [Stream , <<" /" >>]),
418512 #{name := StreamId } = amqqueue :get_type_state (Q ),
@@ -421,8 +515,14 @@ stream_members(Config, Stream) ->
421515 [StreamId , State ]),
422516 Members .
423517
424- init_stream (Config , Node , St ) ->
425- {ok , S , C0 } = stream_test_utils :connect (Config , Node ),
518+ init_coordinator (Config ) ->
519+ % % to make sure the coordinator is initialized
520+ init_stream (Config , 0 , <<" dummy" >>),
521+ delete_stream (stream_port (Config , 0 ), <<" dummy" >>),
522+ wait_for_coordinator_ready (Config ).
523+
524+ init_stream (Config , N , St ) ->
525+ {ok , S , C0 } = stream_test_utils :connect (stream_port (Config , N )),
426526 {ok , C1 } = stream_test_utils :create_stream (S , C0 , St ),
427527 NC = node_count (Config ),
428528 wait_for_members (S , C1 , St , NC ),
@@ -433,8 +533,8 @@ delete_stream(Port, St) ->
433533 {ok , C1 } = stream_test_utils :delete_stream (S , C0 , St ),
434534 {ok , _ } = stream_test_utils :close (S , C1 ).
435535
436- init_super_stream (Config , Ss ) ->
437- {ok , S , C0 } = stream_test_utils :connect (Config , 0 ),
536+ init_super_stream (Config , Node , Ss ) ->
537+ {ok , S , C0 } = stream_test_utils :connect (Config , Node ),
438538 NC = node_count (Config ),
439539 Partitions = [unicode :characters_to_binary ([Ss , <<" -" >>, integer_to_binary (N )])
440540 || N <- lists :seq (0 , NC - 1 )],
@@ -509,9 +609,28 @@ query_consumers(Config, Node, Stream) ->
509609 rpc (Config , Node , rabbit_stream_coordinator , sac_state , []),
510610 Consumers .
511611
612+
613+ all_nodes (Config ) ->
614+ lists :map (fun (N ) ->
615+ nodename (Config , N )
616+ end , lists :seq (0 , node_count (Config ) - 1 )).
617+
512618coordinator_status (Config ) ->
513619 rpc (Config , rabbit_stream_coordinator , status , []).
514620
621+ coordinator_leader (Config ) ->
622+ Status = coordinator_status (Config ),
623+ case lists :search (fun (St ) ->
624+ RS = proplists :get_value (<<" Raft State" >>, St ,
625+ undefined ),
626+ RS == leader
627+ end , Status ) of
628+ {value , Leader } ->
629+ proplists :get_value (<<" Node Name" >>, Leader , undefined );
630+ _ ->
631+ undefined
632+ end .
633+
515634rpc (Config , M , F , A ) ->
516635 rpc (Config , 0 , M , F , A ).
517636
0 commit comments