@@ -276,14 +276,15 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
276276 CL = coordinator_leader (Config ),
277277
278278 Ss = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
279- [Partition , _ , _ ] = init_super_stream (Config , CL , Ss ),
279+
280+ Partition = init_super_stream (Config , CL , Ss , 1 , CL ),
280281 [L , F1 , F2 ] = topology (Config , Partition ),
281282
282283 wait_for_coordinator_ready (Config ),
283284
284285 % % we expect the stream leader and the coordinator leader to be on the same node
285286 % % another node will be isolated
286- ? assertEqual (L # node .name , coordinator_leader ( Config ) ),
287+ ? assertEqual (L # node .name , CL ),
287288
288289 {ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
289290 {ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
@@ -362,15 +363,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
362363super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition (Config ) ->
363364 init_coordinator (Config ),
364365 CL = coordinator_leader (Config ),
366+ [CF1 , _ ] = all_nodes (Config ) -- [CL ],
365367 Ss = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
366- [ _ , Partition , _ ] = init_super_stream (Config , CL , Ss ),
368+ Partition = init_super_stream (Config , CL , Ss , 2 , CF1 ),
367369 [L , F1 , F2 ] = topology (Config , Partition ),
368370
369371 wait_for_coordinator_ready (Config ),
370372
371373 % % check stream leader and coordinator are not on the same node
372374 % % the coordinator leader node will be isolated
373- ? assertNotEqual (L # node .name , coordinator_leader ( Config ) ),
375+ ? assertNotEqual (L # node .name , CL ),
374376
375377 {ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
376378 {ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
@@ -533,7 +535,7 @@ delete_stream(Port, St) ->
533535 {ok , C1 } = stream_test_utils :delete_stream (S , C0 , St ),
534536 {ok , _ } = stream_test_utils :close (S , C1 ).
535537
536- init_super_stream (Config , Node , Ss ) ->
538+ init_super_stream (Config , Node , Ss , PartitionIndex , ExpectedNode ) ->
537539 {ok , S , C0 } = stream_test_utils :connect (Config , Node ),
538540 NC = node_count (Config ),
539541 Partitions = [unicode :characters_to_binary ([Ss , <<" -" >>, integer_to_binary (N )])
@@ -544,10 +546,46 @@ init_super_stream(Config, Node, Ss) ->
544546 {Cmd1 , C1 } = receive_commands (S , C0 ),
545547 ? assertMatch ({response , ? CORR_ID , {create_super_stream , ? RESPONSE_CODE_OK }},
546548 Cmd1 ),
547- Partition = lists :nth (2 , Partitions ),
548- wait_for_members (S , C1 , Partition , NC ),
549+ [wait_for_members (S , C1 , P , NC ) || P <- Partitions ],
550+ Partition = lists :nth (PartitionIndex , Partitions ),
551+ [# node {name = LN } | _ ] = topology (Config , Partition ),
552+ P = case LN of
553+ ExpectedNode ->
554+ Partition ;
555+ _ ->
556+ enforce_stream_leader_on_node (Config , S , C1 ,
557+ Partitions , Partition ,
558+ ExpectedNode , 10 )
559+ end ,
549560 {ok , _ } = stream_test_utils :close (S , C1 ),
550- Partitions .
561+ P .
562+
563+
564+ enforce_stream_leader_on_node (_ , _ , _ , _ , _ , _ , 0 ) ->
565+ ct :fail (" could not create super stream partition on chosen node" );
566+ enforce_stream_leader_on_node (Config , S , C ,
567+ Partitions , Partition , Node , Count ) ->
568+ CL = coordinator_leader (Config ),
569+ NC = node_count (Config ),
570+ [begin
571+ case P of
572+ Partition ->
573+ restart_stream (Config , CL , P , Node );
574+ _ ->
575+ restart_stream (Config , CL , P , undefined )
576+ end ,
577+ wait_for_members (S , C , P , NC )
578+ end || P <- Partitions ],
579+ [# node {name = LN } | _ ] = topology (Config , Partition ),
580+ case LN of
581+ Node ->
582+ Partition ;
583+ _ ->
584+ timer :sleep (500 ),
585+ enforce_stream_leader_on_node (Config , S , C ,
586+ Partitions , Partition , Node ,
587+ Count - 1 )
588+ end .
551589
552590delete_super_stream (Port , Ss ) ->
553591 {ok , S , C0 } = stream_test_utils :connect (Port ),
@@ -631,6 +669,13 @@ coordinator_leader(Config) ->
631669 undefined
632670 end .
633671
672+ restart_stream (Config , Node , S , undefined ) ->
673+ rpc (Config , Node , rabbit_stream_queue , restart_stream , [<<" /" >>, S , #{}]);
674+ restart_stream (Config , Node , S , Leader ) ->
675+ Opts = #{preferred_leader_node => Leader },
676+ rpc (Config , Node , rabbit_stream_queue , restart_stream , [<<" /" >>, S , Opts ]).
677+
678+
634679rpc (Config , M , F , A ) ->
635680 rpc (Config , 0 , M , F , A ).
636681
0 commit comments