1010-include_lib (" common_test/include/ct.hrl" ).
1111-include_lib (" eunit/include/eunit.hrl" ).
1212-include_lib (" amqp_client/include/amqp_client.hrl" ).
13- -compile ([nowarn_export_all , export_all ]).
13+ -include_lib (" rabbitmq_ct_helpers/include/rabbit_assert.hrl" ).
14+
15+ -export ([suite /0 ,
16+ all /0 ,
17+ groups /0 ,
18+ init_per_suite /1 ,
19+ end_per_suite /1 ,
20+ init_per_group /2 ,
21+ end_per_group /2 ,
22+ init_per_testcase /2 ,
23+ end_per_testcase /2 ,
24+
25+ join_khepri_khepri_cluster /1 ,
26+ join_mnesia_khepri_cluster /1 ,
27+ join_mnesia_khepri_cluster_reverse /1 ,
28+ join_khepri_mnesia_cluster /1 ,
29+ join_khepri_mnesia_cluster_reverse /1 ,
30+
31+ join_khepri_khepri_khepri_cluster /1 ,
32+ join_mnesia_khepri_khepri_cluster /1 ,
33+ join_mnesia_khepri_khepri_cluster_reverse /1 ,
34+ join_khepri_mnesia_khepri_cluster /1 ,
35+ join_khepri_mnesia_khepri_cluster_reverse /1 ,
36+ join_khepri_khepri_mnesia_cluster /1 ,
37+ join_khepri_khepri_mnesia_cluster_reverse /1 ,
38+ join_mnesia_mnesia_khepri_cluster /1 ,
39+ join_mnesia_mnesia_khepri_cluster_reverse /1 ,
40+ join_mnesia_khepri_mnesia_cluster /1 ,
41+ join_mnesia_khepri_mnesia_cluster_reverse /1 ,
42+ join_khepri_mnesia_mnesia_cluster /1 ,
43+ join_khepri_mnesia_mnesia_cluster_reverse /1 ,
44+
45+ join_khepri_while_in_minority /1
46+ ]).
1447
1548suite () ->
1649 [{timetrap , 5 * 60_000 }].
@@ -23,7 +56,8 @@ all() ->
2356groups () ->
2457 [
2558 {unclustered , [], [{cluster_size_2 , [], cluster_size_2_tests ()},
26- {cluster_size_3 , [], cluster_size_3_tests ()}]}
59+ {cluster_size_3 , [], cluster_size_3_tests ()},
60+ {cluster_size_5 , [], cluster_size_5_tests ()}]}
2761 ].
2862
2963cluster_size_2_tests () ->
@@ -52,6 +86,11 @@ cluster_size_3_tests() ->
5286 join_khepri_mnesia_mnesia_cluster_reverse
5387 ].
5488
89+ cluster_size_5_tests () ->
90+ [
91+ join_khepri_while_in_minority
92+ ].
93+
5594% % -------------------------------------------------------------------
5695% % Testsuite setup/teardown.
5796% % -------------------------------------------------------------------
@@ -78,7 +117,9 @@ init_per_group(unclustered, Config) ->
78117init_per_group (cluster_size_2 , Config ) ->
79118 rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 2 }]);
80119init_per_group (cluster_size_3 , Config ) ->
81- rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 3 }]).
120+ rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 3 }]);
121+ init_per_group (cluster_size_5 , Config ) ->
122+ rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 5 }]).
82123
83124end_per_group (_ , Config ) ->
84125 Config .
@@ -343,3 +384,121 @@ declare(Ch, Q) ->
343384 durable = true ,
344385 auto_delete = false ,
345386 arguments = []}).
387+
388+ join_khepri_while_in_minority (Config ) ->
389+ [Node1 | ClusteredNodes ] = rabbit_ct_broker_helpers :get_node_configs (
390+ Config , nodename ),
391+ [NodeToJoin | OtherNodes ] = ClusteredNodes ,
392+
393+ % % Cluster nodes 2 to 5.
394+ ct :pal (" Cluster nodes ~p " , [ClusteredNodes ]),
395+ lists :foreach (
396+ fun (Node ) ->
397+ ? assertEqual (
398+ ok ,
399+ rabbit_control_helper :command (
400+ join_cluster , Node , [atom_to_list (NodeToJoin )], []))
401+ end , OtherNodes ),
402+ lists :foreach (
403+ fun (Node ) ->
404+ ? awaitMatch (
405+ ClusteredNodes ,
406+ lists :sort (
407+ rabbit_ct_broker_helpers :rpc (
408+ Config , Node , rabbit_nodes , list_members , [])),
409+ 30000 )
410+ end , ClusteredNodes ),
411+
412+ % % Enable Khepri on all nodes. Only `Node2' is given here because it is
413+ % % clustered with `OtherNodes'.
414+ ct :pal (" Enable `khepri_db` on nodes ~0p and ~0p " , [Node1 , NodeToJoin ]),
415+ Ret1 = rabbit_ct_broker_helpers :enable_feature_flag (
416+ Config , [Node1 , NodeToJoin ], khepri_db ),
417+ case Ret1 of
418+ ok ->
419+ StoreId = rabbit_khepri :get_store_id (),
420+ LeaderId = rabbit_ct_broker_helpers :rpc (
421+ Config , NodeToJoin ,
422+ ra_leaderboard , lookup_leader , [StoreId ]),
423+ {StoreId , LeaderNode } = LeaderId ,
424+
425+ % % Stop all clustered nodes except one follower to create a
426+ % % minority. In other words, we stop two followers, then the
427+ % % leader.
428+ % %
429+ % % Using `lists:reverse/1', we keep the last running followe only
430+ % % to see how clustering works if the first nodes in the cluster
431+ % % are down.
432+ Followers = ClusteredNodes -- [LeaderNode ],
433+ [FollowerToKeep | FollowersToStop ] = lists :reverse (Followers ),
434+
435+ lists :foreach (
436+ fun (Node ) ->
437+ ct :pal (" Stop node ~0p " , [Node ]),
438+ ok = rabbit_ct_broker_helpers :stop_node (Config , Node )
439+ end , FollowersToStop ++ [LeaderNode ]),
440+
441+ % % Try and fail to cluster `Node1' with the others.
442+ ct :pal (" Try to cluster node ~0p with ~0p " , [Node1 , FollowerToKeep ]),
443+ Ret2 = rabbit_control_helper :command (
444+ join_cluster , Node1 , [atom_to_list (FollowerToKeep )], []),
445+ ? assertMatch ({error , 75 , _ }, Ret2 ),
446+ {error , _ , Msg } = Ret2 ,
447+ ? assertEqual (
448+ match ,
449+ re :run (
450+ Msg , " Khepri cluster could be in minority" ,
451+ [{capture , none }])),
452+
453+ % % `Node1' should still be up and running correctly.
454+ ct :pal (" Open a connection + channel to node ~0p " , [Node1 ]),
455+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (
456+ Config , Node1 ),
457+
458+ QName = atom_to_binary (? FUNCTION_NAME ),
459+ QArgs = [{<<" x-queue-type" >>, longstr , <<" quorum" >>}],
460+ ct :pal (" Declare queue ~0p " , [QName ]),
461+ amqp_channel :call (
462+ Ch , # 'queue.declare' {durable = true ,
463+ queue = QName ,
464+ arguments = QArgs }),
465+
466+ ct :pal (" Enable publish confirms" ),
467+ amqp_channel :call (Ch , # 'confirm.select' {}),
468+
469+ ct :pal (" Publish a message to queue ~0p " , [QName ]),
470+ amqp_channel :cast (
471+ Ch ,
472+ # 'basic.publish' {routing_key = QName },
473+ # amqp_msg {props = # 'P_basic' {delivery_mode = 2 }}),
474+ amqp_channel :wait_for_confirms (Ch ),
475+
476+ ct :pal (" Subscribe to queue ~0p " , [QName ]),
477+ CTag = <<" ctag" >>,
478+ amqp_channel :subscribe (
479+ Ch ,
480+ # 'basic.consume' {queue = QName ,
481+ consumer_tag = CTag },
482+ self ()),
483+ receive
484+ # 'basic.consume_ok' {consumer_tag = CTag } ->
485+ ok
486+ after 10000 ->
487+ exit (consume_ok_timeout )
488+ end ,
489+
490+ ct :pal (" Consume a message from queue ~0p " , [QName ]),
491+ receive
492+ {# 'basic.deliver' {consumer_tag = <<" ctag" >>}, _ } ->
493+ ok
494+ after 10000 ->
495+ exit (deliver_timeout )
496+ end ,
497+
498+ ct :pal (" Close channel + connection" ),
499+ rabbit_ct_client_helpers :close_connection_and_channel (Conn , Ch ),
500+
501+ ok ;
502+ {skip , _ } = Skip ->
503+ Skip
504+ end .
0 commit comments