1212% %
1313% % The Initial Developer of the Original Code is Pivotal Software, Inc.
1414% % Copyright (c) 2025 Broadcom. All Rights Reserved.
15- % % The term “ Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
15+ % % The term " Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
1616% %
1717
1818-module (rabbit_stream_partitions_SUITE ).
@@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
107107 % % another node will be isolated
108108 ? assertEqual (L # node .name , coordinator_leader (Config )),
109109
110+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
111+
110112 {ok , So0 , C0_00 } = stream_test_utils :connect (Config , 0 ),
111113 {ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
112114 {ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
@@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
135137 end , Consumers1 ),
136138 # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
137139
140+ log (" Isolating node ~p " , [Isolated ]),
141+
138142 rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
139143 rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
140144
141145 wait_for_disconnected_consumer (Config , LN , S ),
142146 wait_for_presumed_down_consumer (Config , LN , S ),
143147
148+ log (" Node ~p rejoins cluster" , [Isolated ]),
149+
144150 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
145151 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
146152
147153 wait_for_all_consumers_connected (Config , LN , S ),
148154
149155 Consumers2 = query_consumers (Config , LN , S ),
156+ log (" Consumers after partition resolution: ~p " , [Consumers2 ]),
157+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
150158 % % the disconnected, then presumed down consumer is cancelled,
151159 % % because the stream member on its node has been restarted
152160 assertSize (2 , Consumers2 ),
@@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
157165 % % assert the cancelled consumer received a metadata update frame
158166 SubIdToState1 =
159167 maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
168+ log (" Expecting metadata update for disconnected consumer" ),
160169 C1 = receive_metadata_update (S0 , C0 ),
170+ log (" Received metadata update" ),
161171 Acc #{K => {S0 , C1 }};
162172 (K , {S0 , C0 }, Acc ) ->
163173 Acc #{K => {S0 , C0 }}
164174 end , #{}, SubIdToState0 ),
165175
176+ log (" Deleting stream" ),
166177 delete_stream (stream_port (Config , 0 ), S ),
167178
168179 % % online consumers should receive a metadata update frame (stream deleted)
169180 % % we unqueue the this frame before closing the connection
170181 % % directly closing the connection of the cancelled consumer
171182 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
172- {_ , C1 } = receive_commands (S0 , C0 ),
183+ log (" Expecting frame in consumer ~p " , [K ]),
184+ {Cmd1 , C1 } = receive_commands (S0 , C0 ),
185+ log (" Received ~p " , [Cmd1 ]),
186+ log (" Closing" ),
173187 {ok , _ } = stream_test_utils :close (S0 , C1 );
174- (_ , {S0 , C0 }) ->
188+ (K , {S0 , C0 }) ->
189+ log (" Closing ~p " , [K ]),
175190 {ok , _ } = stream_test_utils :close (S0 , C0 )
176191 end , SubIdToState1 ),
177192
@@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
190205 % % the coordinator leader node will be isolated
191206 ? assertNotEqual (L # node .name , CL ),
192207
208+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
209+
193210 {ok , So0 , C0_00 } = stream_test_utils :connect (Config , CL ),
194211 {ok , So1 , C1_00 } = stream_test_utils :connect (Config , CF1 ),
195212 {ok , So2 , C2_00 } = stream_test_utils :connect (Config , CF2 ),
@@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
216233 end , Consumers1 ),
217234 # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
218235
236+ log (" Isolating node ~p " , [Isolated ]),
237+
219238 rabbit_ct_broker_helpers :block_traffic_between (Isolated , CF1 ),
220239 rabbit_ct_broker_helpers :block_traffic_between (Isolated , CF2 ),
221240
222241 wait_for_disconnected_consumer (Config , NotIsolated , S ),
223242 wait_for_presumed_down_consumer (Config , NotIsolated , S ),
224243
244+ log (" Node ~p rejoins cluster" , [Isolated ]),
245+
225246 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , CF1 ),
226247 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , CF2 ),
227248
@@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
231252
232253 Consumers2 = query_consumers (Config , NotIsolated , S ),
233254
255+ log (" Consumers after partition resolution ~p " , [Consumers2 ]),
256+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
234257 % % the disconnected, then presumed down consumer is cancelled,
235258 % % because the stream member on its node has been restarted
236259 assertSize (2 , Consumers2 ),
@@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
246269
247270 SubIdToState1 =
248271 maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
272+ log (" Expecting metadata update for disconnected consumer" ),
249273 % % cancelled consumer received a metadata update
250274 C1 = receive_metadata_update (S0 , C0 ),
275+ log (" Received metadata update" ),
251276 Acc #{K => {S0 , C1 }};
252277 (K , {S0 , C0 }, Acc ) when K == ActiveSubId ->
278+ log (" Expecting consumer update for promoted consumer" ),
253279 % % promoted consumer should have received consumer update
254280 C1 = receive_consumer_update_and_respond (S0 , C0 ),
281+ log (" Received consumer update" ),
255282 Acc #{K => {S0 , C1 }};
256283 (K , {S0 , C0 }, Acc ) ->
257284 Acc #{K => {S0 , C0 }}
258285 end , #{}, SubIdToState0 ),
259286
287+ log (" Deleting stream" ),
260288 delete_stream (L # node .stream_port , S ),
261289
262290 % % online consumers should receive a metadata update frame (stream deleted)
263291 % % we unqueue this frame before closing the connection
264292 % % directly closing the connection of the cancelled consumer
265293 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
266- {_ , C1 } = receive_commands (S0 , C0 ),
294+ log (" Expecting frame in consumer ~p " , [K ]),
295+ {Cmd1 , C1 } = receive_commands (S0 , C0 ),
296+ log (" Received ~p " , [Cmd1 ]),
297+ log (" Closing" ),
267298 {ok , _ } = stream_test_utils :close (S0 , C1 );
268- (_ , {S0 , C0 }) ->
299+ (K , {S0 , C0 }) ->
300+ log (" Closing ~p " , [K ]),
269301 {ok , _ } = stream_test_utils :close (S0 , C0 )
270302 end , SubIdToState1 ),
271303
@@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
286318 % % another node will be isolated
287319 ? assertEqual (L # node .name , CL ),
288320
321+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
322+
289323 {ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
290324 {ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
291325 {ok , So2 , C2_00 } = stream_test_utils :connect (F2 # node .stream_port ),
@@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
315349 end , Consumers1 ),
316350 # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
317351
352+ log (" Isolating node ~p " , [Isolated ]),
353+
318354 rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
319355 rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
320356
321357 wait_for_disconnected_consumer (Config , NotIsolated , Partition ),
322358 wait_for_presumed_down_consumer (Config , NotIsolated , Partition ),
323359
360+ log (" Node ~p rejoins cluster" , [Isolated ]),
361+
324362 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
325363 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
326364
@@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
329367 wait_for_all_consumers_connected (Config , NotIsolated , Partition ),
330368
331369 Consumers2 = query_consumers (Config , NotIsolated , Partition ),
370+ log (" Consumers after partition resolution: ~p " , [Consumers2 ]),
371+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
332372
333373 % % the disconnected, then presumed down consumer is cancelled,
334374 % % because the stream member on its node has been restarted
@@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
340380
341381 SubIdToState1 =
342382 maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
383+ log (" Expecting metadata update for disconnected consumer" ),
343384 % % cancelled consumer received a metadata update
344385 C1 = receive_metadata_update (S0 , C0 ),
386+ log (" Received metadata update" ),
345387 Acc #{K => {S0 , C1 }};
346388 (K , {S0 , C0 }, Acc ) ->
347389 Acc #{K => {S0 , C0 }}
348390 end , #{}, SubIdToState0 ),
349391
392+ log (" Deleting super stream" ),
350393 delete_super_stream (L # node .stream_port , Ss ),
351394
352395 % % online consumers should receive a metadata update frame (stream deleted)
353396 % % we unqueue this frame before closing the connection
354397 % % directly closing the connection of the cancelled consumer
355398 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
356- {_ , C1 } = receive_commands (S0 , C0 ),
399+ log (" Expecting frame in consumer ~p " , [K ]),
400+ {Cmd1 , C1 } = receive_commands (S0 , C0 ),
401+ log (" Received ~p " , [Cmd1 ]),
402+ log (" Closing" ),
357403 {ok , _ } = stream_test_utils :close (S0 , C1 );
358- (_ , {S0 , C0 }) ->
404+ (K , {S0 , C0 }) ->
405+ log (" Closing ~p " , [K ]),
359406 {ok , _ } = stream_test_utils :close (S0 , C0 )
360407 end , SubIdToState1 ),
361408 ok .
@@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
374421 % % the coordinator leader node will be isolated
375422 ? assertNotEqual (L # node .name , CL ),
376423
424+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
425+
377426 {ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
378427 {ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
379428 {ok , So2 , C2_00 } = stream_test_utils :connect (F2 # node .stream_port ),
@@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
410459 end , Consumers1 ),
411460 # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
412461
462+ log (" Isolating node ~p " , [Isolated ]),
463+
413464 rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
414465 rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
415466
416467 wait_for_disconnected_consumer (Config , NotIsolated , Partition ),
417468 wait_for_presumed_down_consumer (Config , NotIsolated , Partition ),
418469
470+ log (" Node ~p rejoins cluster" , [Isolated ]),
471+
419472 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
420473 rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
421474
@@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
424477 wait_for_all_consumers_connected (Config , NotIsolated , Partition ),
425478
426479 Consumers2 = query_consumers (Config , NotIsolated , Partition ),
480+ log (" Consumers after partition resolution: ~p " , [Consumers2 ]),
481+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
427482
428483 % % the disconnected, then presumed down consumer is cancelled,
429484 % % because the stream member on its node has been restarted
@@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
440495
441496 SubIdToState1 =
442497 maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
498+ log (" Expecting metadata update for disconnected consumer" ),
443499 % % cancelled consumer received a metadata update
444500 C1 = receive_metadata_update (S0 , C0 ),
501+ log (" Received metadata update" ),
445502 Acc #{K => {S0 , C1 }};
446503 (K , {S0 , C0 }, Acc ) when K == ActiveSubId ->
504+ log (" Expecting consumer update for promoted consumer" ),
447505 % % promoted consumer should have received consumer update
448506 C1 = receive_consumer_update_and_respond (S0 , C0 ),
507+ log (" Received consumer update" ),
449508 Acc #{K => {S0 , C1 }};
450509 (K , {S0 , C0 }, Acc ) ->
451510 Acc #{K => {S0 , C0 }}
452511 end , #{}, SubIdToState0 ),
453512
513+ log (" Deleting super stream" ),
454514 delete_super_stream (L # node .stream_port , Ss ),
455515
456516 % % online consumers should receive a metadata update frame (stream deleted)
457517 % % we unqueue this frame before closing the connection
458518 % % directly closing the connection of the cancelled consumer
459519 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
520+ log (" Expecting frame in consumer ~p " , [K ]),
460521 {Cmd1 , C1 } = receive_commands (S0 , C0 ),
461- ct :pal (" Received command: ~p " , [Cmd1 ]),
522+ log (" Received ~p " , [Cmd1 ]),
523+ log (" Closing" ),
462524 {ok , _ } = stream_test_utils :close (S0 , C1 );
463- (_ , {S0 , C0 }) ->
525+ (K , {S0 , C0 }) ->
526+ log (" Closing ~p " , [K ]),
464527 {ok , _ } = stream_test_utils :close (S0 , C0 )
465528 end , SubIdToState1 ),
466529 ok .
@@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) ->
727790 rabbit_ct_helpers :await_condition (
728791 fun () ->
729792 Cs = query_consumers (Config , Node , Stream ),
793+ log (" Expecting a disconnected consumer: ~p " , [Cs ]),
730794 lists :any (fun (# consumer {status = {disconnected , _ }}) ->
731795 true ;
732796 (_ ) ->
@@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) ->
738802 rabbit_ct_helpers :await_condition (
739803 fun () ->
740804 Cs = query_consumers (Config , Node , Stream ),
805+ log (" Expecting a presumed-down consumer: ~p " , [Cs ]),
741806 lists :any (fun (# consumer {status = {presumed_down , _ }}) ->
742807 true ;
743808 (_ ) ->
@@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) ->
749814 rabbit_ct_helpers :await_condition (
750815 fun () ->
751816 Cs = query_consumers (Config , Node , Stream ),
817+ log (" Expecting connected consumers: ~p " , [Cs ]),
752818 lists :all (fun (# consumer {status = {connected , _ }}) ->
753819 true ;
754820 (_ ) ->
@@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) ->
761827 rabbit_ct_helpers :await_condition (
762828 fun () ->
763829 Status = coordinator_status (Config ),
830+ log (" Coordinator status: ~p " , [Status ]),
764831 lists :all (fun (St ) ->
765832 RS = proplists :get_value (<<" Raft State" >>, St ,
766833 undefined ),
@@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) ->
785852
786853assertEmpty (Data ) ->
787854 assertSize (0 , Data ).
855+
856+ log (Format ) ->
857+ ct :pal (Format ).
858+
859+ log (Format , Args ) ->
860+ ct :pal (Format , Args ).
0 commit comments