1010-include_lib (" common_test/include/ct.hrl" ).
1111-include_lib (" amqp_client/include/amqp_client.hrl" ).
1212-include_lib (" eunit/include/eunit.hrl" ).
13+ -include_lib (" rabbitmq_ct_helpers/include/rabbit_assert.hrl" ).
1314
1415-compile (nowarn_export_all ).
1516-compile (export_all ).
@@ -41,7 +42,8 @@ groups() ->
4142 [{cluster_size_3 , [], [
4243 force_standalone_boot ,
4344 force_standalone_boot_and_restart ,
44- force_standalone_boot_and_restart_with_quorum_queues
45+ force_standalone_boot_and_restart_with_quorum_queues ,
46+ recover_after_partition_with_leader
4547 ]}
4648 ]},
4749 {clustered_5_nodes , [],
@@ -66,7 +68,9 @@ suite() ->
6668
6769init_per_suite (Config ) ->
6870 rabbit_ct_helpers :log_environment (),
69- rabbit_ct_helpers :run_setup_steps (Config ).
71+ rabbit_ct_helpers :run_setup_steps (
72+ Config ,
73+ [fun rabbit_ct_broker_helpers :configure_dist_proxy /1 ]).
7074
7175end_per_suite (Config ) ->
7276 rabbit_ct_helpers :run_teardown_steps (Config ).
@@ -249,6 +253,172 @@ force_standalone_boot_and_restart_with_quorum_queues(Config) ->
249253
250254 ok .
251255
256+ recover_after_partition_with_leader (Config ) ->
257+ Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
258+
259+ % % We use intermediate Erlang nodes between the common_test control node
260+ % % and the RabbitMQ nodes, using `peer' standard_io communication. The goal
261+ % % is to make sure the common_test control node doesn't interfere with the
262+ % % nodes the RabbitMQ nodes can see, despite the blocking of the Erlang
263+ % % distribution connection.
264+ Proxies0 = [begin
265+ {ok , Proxy , PeerNode } = peer :start_link (
266+ #{name => peer :random_name (),
267+ connection => standard_io ,
268+ wait_boot => 120000 }),
269+ ct :pal (" Proxy ~0p -> ~0p " , [Proxy , PeerNode ]),
270+ Proxy
271+ end || _ <- Nodes ],
272+ Proxies = maps :from_list (lists :zip (Nodes , Proxies0 )),
273+ ct :pal (" Proxies: ~p " , [Proxies ]),
274+ Config1 = [{proxies , Proxies } | Config ],
275+
276+ NodeA = hd (Nodes ),
277+
278+ ct :pal (" Prevent automatic reconnection on the common_test node" ),
279+ application :set_env (kernel , dist_auto_connect , never ),
280+ ct :pal (" Disconnect the common_test node from RabbitMQ nodes" ),
281+ lists :foreach (fun erlang :disconnect_node /1 , Nodes ),
282+ ct :pal (
283+ " Ensure RabbitMQ nodes only know about the RabbitMQ nodes "
284+ " (and their proxy)" ),
285+ lists :foreach (
286+ fun (Node ) ->
287+ ? awaitMatch (
288+ Nodes ,
289+ get_connected_nodes (Config1 , Node ),
290+ 30000 )
291+ end , Nodes ),
292+
293+ ct :pal (" Wait for a Khepri leader to be elected" ),
294+ ? awaitMatch ({ok , _ }, get_leader_node (Config1 , NodeA ), 30000 ),
295+
296+ ct :pal (" Query the Khepri leader nodename" ),
297+ {ok , Leader } = get_leader_node (Config1 , NodeA ),
298+ Followers = Nodes -- [Leader ],
299+ ct :pal (" Leader: ~0p~n Followers: ~p " , [Leader , Followers ]),
300+
301+ lists :foreach (
302+ fun (Follower ) ->
303+ ct :pal (
304+ ? LOW_IMPORTANCE ,
305+ " Blocking traffic between ~ts and ~ts " ,
306+ [Leader , Follower ]),
307+ ? assertEqual (
308+ ok ,
309+ proxied_rpc (
310+ Config1 , Leader , inet_tcp_proxy_dist , block , [Follower ])),
311+ ? assertEqual (
312+ ok ,
313+ proxied_rpc (
314+ Config1 , Follower , inet_tcp_proxy_dist , block , [Leader ]))
315+ end , Followers ),
316+
317+ ct :pal (
318+ " Ensure the leader node is disconnected from other RabbitMQ nodes" ),
319+ ? awaitMatch (
320+ [Leader ],
321+ get_connected_nodes (Config1 , Leader ),
322+ 30000 ),
323+ ct :pal (
324+ " Ensure the follower nodes are disconnected from the leader node" ),
325+ lists :foreach (
326+ fun (Follower ) ->
327+ ? awaitMatch (
328+ Followers ,
329+ get_connected_nodes (Config1 , Follower ),
330+ 30000 )
331+ end , Followers ),
332+
333+ ct :pal (" Wait for each side of the partition to have its own leader" ),
334+ Follower1 = hd (Followers ),
335+ ? awaitMatch (
336+ false ,
337+ begin
338+ LeaderA = get_leader_node (Config1 , Leader ),
339+ LeaderB = get_leader_node (Config1 , Follower1 ),
340+ ct :pal (" LeaderA: ~0p~n LeaderB: ~0p " , [LeaderA , LeaderB ]),
341+ LeaderA =:= LeaderB
342+ end ,
343+ 30000 ),
344+
345+ ct :pal (" Waiting for 2 minutes" ),
346+ timer :sleep (120000 ),
347+
348+ ct :pal (" Query Khepri status for each RabbitMQ node" ),
349+ PerNodeStatus1 = get_per_node_khepri_status (Config1 ),
350+ ct :pal (" Per-node Khepri status (during partition):~n~p " , [PerNodeStatus1 ]),
351+
352+ lists :foreach (
353+ fun (Follower ) ->
354+ ct :pal (
355+ ? LOW_IMPORTANCE ,
356+ " Unblocking traffic between ~ts and ~ts " ,
357+ [Leader , Follower ]),
358+ ? assertEqual (
359+ ok ,
360+ proxied_rpc (
361+ Config1 , Leader , inet_tcp_proxy_dist , allow , [Follower ])),
362+ ? assertEqual (
363+ ok ,
364+ proxied_rpc (
365+ Config1 , Follower , inet_tcp_proxy_dist , allow , [Leader ]))
366+ end , Followers ),
367+
368+ ct :pal (" Wait for the whole cluster to agree on the same leader" ),
369+ ? awaitMatch (
370+ true ,
371+ begin
372+ LeaderA = get_leader_node (Config1 , Leader ),
373+ LeaderB = get_leader_node (Config1 , Follower1 ),
374+ ct :pal (" LeaderA: ~0p~n LeaderB: ~0p " , [LeaderA , LeaderB ]),
375+ LeaderA =:= LeaderB
376+ end ,
377+ 30000 ),
378+
379+ ct :pal (" Query Khepri status for each RabbitMQ node" ),
380+ PerNodeStatus2 = get_per_node_khepri_status (Config1 ),
381+ ct :pal (" Per-node Khepri status (after recovery):~n~p " , [PerNodeStatus2 ]),
382+
383+ ct :pal (" Restore automatic reconnection on the common_test node" ),
384+ application :unset_env (kernel , dist_auto_connect ),
385+ ok .
386+
387+ proxied_rpc (Config , Node , Module , Function , Args ) ->
388+ Proxies = ? config (proxies , Config ),
389+ Proxy = maps :get (Node , Proxies ),
390+ peer :call (
391+ Proxy , rabbit_ct_broker_helpers , rpc ,
392+ [Config , Node , Module , Function , Args ]).
393+
394+ get_leader_node (Config , Node ) ->
395+ StoreId = rabbit_khepri :get_store_id (),
396+ Ret = proxied_rpc (
397+ Config , Node ,
398+ ra_leaderboard , lookup_leader , [StoreId ]),
399+ case Ret of
400+ {StoreId , LeaderNode } ->
401+ {ok , LeaderNode };
402+ undefined ->
403+ {error , no_leader }
404+ end .
405+
406+ get_connected_nodes (Config , Node ) ->
407+ Proxies = ? config (proxies , Config ),
408+ Proxy = maps :get (Node , Proxies ),
409+ Peer = peer :call (Proxy , erlang , node , []),
410+ OtherNodes = proxied_rpc (Config , Node , erlang , nodes , []),
411+ lists :sort ([Node | OtherNodes -- [Peer ]]).
412+
413+ get_per_node_khepri_status (Config ) ->
414+ Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
415+ maps :from_list (
416+ lists :map (
417+ fun (Node ) ->
418+ Status = proxied_rpc (Config , Node , rabbit_khepri , status , []),
419+ {Node , Status }
420+ end , Nodes )).
421+
252422rolling_restart (Config ) ->
253423 Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
254424
0 commit comments