diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index 2bf52b3a01c8..f8bf3d0ea13c 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -328,7 +328,7 @@ list_in_khepri(Path) -> Objects :: [term()]. list_in_khepri(Path, Options) -> - case rabbit_khepri:match(Path, Options) of + case rabbit_khepri:get_many(Path, Options) of {ok, Map} -> maps:values(Map); _ -> [] end. diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl index a11ba80af42e..e13c2f01307e 100644 --- a/deps/rabbit/src/rabbit_db_cluster.erl +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -279,7 +279,7 @@ forget_member_using_khepri(_Node, true) -> #{domain => ?RMQLOG_DOMAIN_DB}), {error, not_supported}; forget_member_using_khepri(Node, false = _RemoveWhenOffline) -> - rabbit_khepri:leave_cluster(Node). + rabbit_khepri:remove_member(Node). %% ------------------------------------------------------------------- %% Cluster update. diff --git a/deps/rabbit/src/rabbit_db_rtparams.erl b/deps/rabbit/src/rabbit_db_rtparams.erl index 68decc6ca9c3..39141903aaed 100644 --- a/deps/rabbit/src/rabbit_db_rtparams.erl +++ b/deps/rabbit/src/rabbit_db_rtparams.erl @@ -357,7 +357,7 @@ delete_vhost_in_mnesia_tx(VHostName) -> delete_vhost_in_khepri(VHostName) -> Pattern = khepri_vhost_rp_path( VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), - case rabbit_khepri:adv_delete_many(Pattern) of + case rabbit_khepri:adv_delete(Pattern) of {ok, NodePropsMap} -> RTParams = maps:fold( diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl index 81deccfa6c03..3a700a3b35b1 100644 --- a/deps/rabbit/src/rabbit_db_user.erl +++ b/deps/rabbit/src/rabbit_db_user.erl @@ -402,7 +402,7 @@ match_user_permissions_in_mnesia_tx(Username, VHostName) -> match_user_permissions_in_khepri('_' = _Username, '_' = _VHostName) -> Path = khepri_user_permission_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), - case rabbit_khepri:match(Path) of + case rabbit_khepri:get_many(Path) of {ok, Map} -> maps:values(Map); _ -> diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index ae43ae8e51ca..be9d5b42b06f 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -2,7 +2,8 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2023-2025 Broadcom. All Rights Reserved. The term “Broadcom” +%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% %% @doc Khepri database uses wrapper. @@ -98,64 +99,73 @@ -include("include/rabbit_khepri.hrl"). +%% Initialisation. -export([setup/0, setup/1, - register_projections/0, init/1, - can_join_cluster/1, + reset/0, + + dir/0, + get_ra_cluster_name/0, + get_store_id/0, + root_path/0, + + info/0]). + +%% Clustering. +-export([can_join_cluster/1, add_member/2, + do_join/1, %% Internal RPC from this module. remove_member/1, + members/0, locally_known_members/0, nodes/0, locally_known_nodes/0, - get_ra_cluster_name/0, - get_store_id/0, - transfer_leadership/1, - fence/1, - is_empty/0, - create/2, - adv_create/2, - update/2, - cas/3, - fold/3, fold/4, - foreach/2, - filter/2, + check_cluster_consistency/0, + node_info/0, %% Internal RPC from this module. + cluster_status_from_khepri/0, + transfer_leadership/1]). - get/1, - get/2, +%% CLI command support. +-export([force_shrink_member_to_current_member/0, + status/0, + cli_cluster_status/0]). + +%% "Proxy" functions to Khepri query/update API. +-export([is_empty/0, + + get/1, get/2, + adv_get/1, adv_get/2, + get_many/1, get_many/2, + adv_get_many/1, adv_get_many/2, + exists/1, exists/2, count/1, count/2, - get_many/1, - adv_get/1, - adv_get_many/1, - match/1, - match/2, - exists/1, - list/1, - list_child_nodes/1, - count_children/1, + fold/3, fold/4, + foreach/2, foreach/3, + map/2, map/3, + filter/2, filter/3, put/2, put/3, - adv_put/2, - clear_payload/1, - delete/1, delete/2, - delete_or_fail/1, - adv_delete_many/1, + adv_put/2, adv_put/3, + create/2, create/3, + adv_create/2, adv_create/3, + update/2, update/3, + adv_update/2, adv_update/3, - transaction/1, - transaction/2, - transaction/3, + delete/1, delete/2, + adv_delete/1, adv_delete/2, + clear_payload/1, clear_payload/2, - clear_store/0, + transaction/1, transaction/2, transaction/3, - dir/0, - info/0, - root_path/0, + fence/1, handle_async_ret/1, - status/0]). + delete_or_fail/1]). + %% Used during migration to join the standalone Khepri nodes and form the %% equivalent cluster -export([khepri_db_migration_enable/1, @@ -163,20 +173,10 @@ is_enabled/0, is_enabled/1, get_feature_state/0, get_feature_state/1, handle_fallback/1]). --export([do_join/1]). -%% To add the current node to an existing cluster --export([leave_cluster/1]). --export([check_cluster_consistency/0, - check_cluster_consistency/2, - node_info/0]). --export([reset/0]). --export([cluster_status_from_khepri/0, - cli_cluster_status/0]). - --export([force_shrink_member_to_current_member/0]). -ifdef(TEST). --export([force_metadata_store/1, +-export([register_projections/0, + force_metadata_store/1, clear_forced_metadata_store/0]). -endif. @@ -238,19 +238,35 @@ ]). %% ------------------------------------------------------------------- -%% API wrapping Khepri. +%% Khepri integration initialisation. %% ------------------------------------------------------------------- -spec setup() -> ok | no_return(). -%% @private +%% @doc Starts the local Khepri store. +%% +%% @see setup/1. setup() -> setup(rabbit_prelaunch:get_context()). --spec setup(map()) -> ok | no_return(). -%% @private +-spec setup(Context) -> ok | no_return() when + Context :: map(). +%% @doc Starts the local Khepri store. +%% +%% Before starting the Khepri store, it ensures that the underlying Ra system +%% we want to use is also running. +%% +%% This function is idempotent whether the Khepri store is started for the +%% first time or it is restarted. +%% +%% This function blocks until a leader is elected. +%% +%% The Khepri application must be running. +%% +%% If it fails to start the Khepri store or if it reaches a timeout waiting for +%% a leader, this function exits. -setup(_) -> +setup(_Context) -> ?LOG_DEBUG("Starting Khepri-based " ?RA_FRIENDLY_NAME), ok = ensure_ra_system_started(), Timeout = application:get_env(rabbit, khepri_default_timeout, 30000), @@ -279,17 +295,25 @@ setup(_) -> exit(Error) end. +ensure_ra_system_started() -> + {ok, _} = application:ensure_all_started(khepri), + ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM). + retry_timeout() -> case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of - {ok, T} -> T; - undefined -> 300_000 + {ok, T} when is_integer(T) andalso T >= 0 -> T; + undefined -> 300_000 end. -%% @private - -spec init(IsVirgin) -> Ret when IsVirgin :: boolean(), Ret :: ok | timeout_error(). +%% @doc Ensures the store has caught up with the cluster. +%% +%% In addition to making sure the local Khepri store is on the same page as the +%% leader, it initialises the Khepri projections if this node is virgin. +%% +%% Finally, it requests the deletion of transient queues on this node. init(IsVirgin) -> case members() of @@ -307,10 +331,8 @@ init(IsVirgin) -> "up to the Raft cluster leader", [], #{domain => ?RMQLOG_DOMAIN_DB}), ok ?= case IsVirgin of - true -> - register_projections(); - false -> - ok + true -> register_projections(); + false -> ok end, %% Delete transient queues on init. %% Note that we also do this in the @@ -332,6 +354,93 @@ await_replication() -> #{domain => ?RMQLOG_DOMAIN_DB}), fence(Timeout). +-spec reset() -> ok | no_return(). +%% @doc Reset and stops the local Khepri store. +%% +%% This function first ensures that the local Khepri store is running. +%% +%% Then it resets the store. This includes removing it from its cluster if +%% any, and deleting all tree nodes. +%% +%% Finally, it stops the store and deteles files on disk. +%% +%% The Khepri application is left running. +%% +%% RabbitMQ must be stopped on this Erlang node. This functions throws an +%% exception if it is called while RabbitMQ is still running. +%% +%% @private + +reset() -> + case rabbit:is_running() of + false -> + %% Rabbit should be stopped, but Khepri needs to be running. + %% Restart it. + ok = setup(), + ok = khepri_cluster:reset(?RA_CLUSTER_NAME), + ok = khepri:stop(?RA_CLUSTER_NAME), + + _ = file:delete(rabbit_guid:filename()), + ok; + true -> + throw({error, rabbitmq_unexpectedly_running}) + end. + +-spec dir() -> Dir when + Dir :: file:filename_all(). +%% @doc Returns the Khepri store directory. +%% +%% This corresponds to the underlying Ra system's directory. + +dir() -> + DataDir = rabbit_mnesia:dir(), + StoreDir = filename:join(DataDir, atom_to_list(?STORE_ID)), + StoreDir. + +-spec get_ra_cluster_name() -> RaClusterName when + RaClusterName :: ra:cluster_name(). +%% @doc Returns the Ra cluster name. + +get_ra_cluster_name() -> + ?RA_CLUSTER_NAME. + +-spec get_store_id() -> StoreId when + StoreId :: khepri:store_id(). +%% @doc Returns the Khepri store identifier. + +get_store_id() -> + ?STORE_ID. + +-spec root_path() -> RootPath when + RootPath :: khepri_path:path(). +%% @doc Returns the path where RabbitMQ stores every metadata. +%% +%% This path must be prepended to all paths used by RabbitMQ subsystems. + +root_path() -> + ?RABBITMQ_KHEPRI_ROOT_PATH. + +info() -> + ok = setup(), + khepri:info(?STORE_ID). + +%% ------------------------------------------------------------------- +%% Clustering. +%% ------------------------------------------------------------------- + +-spec can_join_cluster(DiscoveryNode) -> Ret when + DiscoveryNode :: node(), + Ret :: {ok, ClusterNodes} | {error, any()}, + ClusterNodes :: [node()]. +%% @doc Indicates if this node can join `DiscoveryNode' cluster. +%% +%% At the level of Khepri, it is always possible to join a remote cluster for +%% now. Therefore this function only queries the list of members in +%% `DiscoveryNode' cluster and returns it. +%% +%% @returns an `ok' tuple with the list of members in `DiscoveryNode' cluster, +%% or an error tuple. +%% %% @private can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) -> @@ -339,7 +448,7 @@ can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) -> try ClusterNodes0 = erpc:call( DiscoveryNode, - rabbit_khepri, locally_known_nodes, []), + ?MODULE, locally_known_nodes, []), ClusterNodes1 = ClusterNodes0 -- [ThisNode], {ok, ClusterNodes1} catch @@ -347,6 +456,21 @@ can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) -> {error, Reason} end. +-spec add_member(JoiningNode, JoinedNode | JoinedCluster) -> Ret when + JoiningNode :: node(), + JoinedNode :: node(), + JoinedCluster :: [node()], + Ret :: ok | {error, any()}. +%% @doc Adds `JoiningNode' to `JoinedNode''s cluster. +%% +%% If a list of nodes is passed as `JoinedCluster', this function will pick +%% this node if it is part of the list and the Khepri store is running, or the +%% first node in the list that runs the Khepri store. +%% +%% The actual join code runs on the node that wants to join a cluster. +%% Therefore, if `JoiningNode' is this node, the code runs locally. Otherwise, +%% this function does an RPC call to execute the remote function. +%% %% @private add_member(JoiningNode, JoinedNode) @@ -355,7 +479,7 @@ add_member(JoiningNode, JoinedNode) post_add_member(JoiningNode, JoinedNode, Ret); add_member(JoiningNode, JoinedNode) when is_atom(JoinedNode) -> Ret = rabbit_misc:rpc_call( - JoiningNode, rabbit_khepri, do_join, [JoinedNode]), + JoiningNode, ?MODULE, do_join, [JoinedNode]), post_add_member(JoiningNode, JoinedNode, Ret); add_member(JoiningNode, [_ | _] = Cluster) -> case pick_node_in_cluster(Cluster) of @@ -396,6 +520,22 @@ pick_node_in_cluster([_ | _] = Cluster) -> {error, {no_nodes_to_cluster_with, Cluster}} end. +-spec do_join(RemoteNode) -> Ret when + RemoteNode :: node(), + Ret :: ok | {error, any()}. +%% @doc Adds this node to `RemoteNode''s cluster. +%% +%% Before adding this node to the remote node's cluster, this function call +%% {@link setup/0} to ensure the Khepri store is running. +%% +%% It also pings the remote node to make sure it is reachable. +%% +%% If RabbitMQ is still running on the Erlang node, it will put it in +%% maintenance before proceeding. It will resume RabbitMQ after the join (or if +%% the join fails). +%% +%% @private + do_join(RemoteNode) when RemoteNode =/= node() -> ThisNode = node(), @@ -408,7 +548,6 @@ do_join(RemoteNode) when RemoteNode =/= node() -> %% Ensure the local Khepri store is running before we can reset it. It %% could be stopped if RabbitMQ is not running for instance. ok = setup(), - khepri:info(?RA_CLUSTER_NAME), %% Ensure the remote node is reachable before we add it. case net_adm:ping(RemoteNode) of @@ -470,14 +609,27 @@ post_add_member(JoiningNode, JoinedNode, Error) -> #{domain => ?RMQLOG_DOMAIN_GLOBAL}), Error. +-spec remove_member(NodeToRemove) -> ok when + NodeToRemove :: node(). +%% @doc Removes `NodeToRemove' from its cluster. +%% +%% This function runs on the node calling it. +%% +%% If `NodeToRemove' is reachable, this function calls the regular {@link +%% khepri_cluster:reset/1} on `NodeToRemove'. If it is unreachable, this +%% function call Ra on this node to remove the remote member. +%% %% @private -leave_cluster(Node) -> - retry_khepri_op(fun() -> remove_member(Node) end, 60). +remove_member(Node) -> + retry_khepri_op(fun() -> do_remove_member(Node) end, 60). +-spec do_remove_member(NodeToRemove) -> Ret when + NodeToRemove :: node(), + Ret :: ok | {error, any()}. %% @private -remove_member(NodeToRemove) when NodeToRemove =/= node() -> +do_remove_member(NodeToRemove) when NodeToRemove =/= node() -> ?LOG_DEBUG( "Trying to remove node ~s from Khepri cluster \"~s\" on node ~s", [NodeToRemove, ?RA_CLUSTER_NAME, node()], @@ -525,7 +677,7 @@ remove_reachable_member(NodeToRemove) -> [NodeToRemove, ?RA_CLUSTER_NAME], #{domain => ?RMQLOG_DOMAIN_GLOBAL}), ok; - Error -> + {error, _} = Error -> ?LOG_ERROR( "Failed to remove remote node ~s from Khepri " "cluster \"~s\": ~p", @@ -563,33 +715,25 @@ remove_down_member(NodeToRemove) -> {error, timeout} end. -%% @private - -reset() -> - case rabbit:is_running() of - false -> - %% Rabbit should be stopped, but Khepri needs to be running. - %% Restart it. - ok = setup(), - ok = khepri_cluster:reset(?RA_CLUSTER_NAME), - ok = khepri:stop(?RA_CLUSTER_NAME), - - _ = file:delete(rabbit_guid:filename()), - ok; - true -> - throw({error, rabbitmq_unexpectedly_running}) +retry_khepri_op(Fun, 0) -> + Fun(); +retry_khepri_op(Fun, N) -> + case Fun() of + {error, {no_more_servers_to_try, Reasons}} = Err -> + case lists:member({error,cluster_change_not_permitted}, Reasons) of + true -> + timer:sleep(1000), + retry_khepri_op(Fun, N - 1); + false -> + Err + end; + {error, cluster_change_not_permitted} -> + timer:sleep(1000), + retry_khepri_op(Fun, N - 1); + Any -> + Any end. -%% @private - -force_shrink_member_to_current_member() -> - ok = ra_server_proc:force_shrink_members_to_current_member( - {?RA_CLUSTER_NAME, node()}). - -ensure_ra_system_started() -> - {ok, _} = application:ensure_all_started(khepri), - ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM). - -spec members() -> Members when Members :: [ra:server_id()]. %% @doc Returns the list of Ra server identifiers that are part of the @@ -652,151 +796,11 @@ locally_known_nodes() -> {error, _Reason} -> [] end. --spec get_ra_cluster_name() -> RaClusterName when - RaClusterName :: ra:cluster_name(). -%% @doc Returns the Ra cluster name. - -get_ra_cluster_name() -> - ?RA_CLUSTER_NAME. - --spec get_store_id() -> StoreId when - StoreId :: khepri:store_id(). -%% @doc Returns the Khepri store identifier. - -get_store_id() -> - ?STORE_ID. - --spec dir() -> Dir when - Dir :: file:filename_all(). -%% @doc Returns the Khepri store directory. +-spec check_cluster_consistency() -> Ret when + Ret :: ok | {error, any()}. +%% @doc Performs various checks to validate that this node is healthy at the +%% metadata store level. %% -%% This corresponds to the underlying Ra system's directory. - -dir() -> - filename:join(rabbit_mnesia:dir(), atom_to_list(?STORE_ID)). - --spec transfer_leadership([node()]) -> - {ok, in_progress | undefined | node()} | {error, any()}. -%% @private - -transfer_leadership([]) -> - rabbit_log:warning("Skipping leadership transfer of metadata store: no candidate " - "(online, not under maintenance) nodes to transfer to!"); -transfer_leadership(TransferCandidates) -> - case get_feature_state() of - enabled -> - transfer_leadership0(TransferCandidates); - _ -> - rabbit_log:info("Skipping leadership transfer of metadata store: Khepri is not enabled") - end. - --spec transfer_leadership0([node()]) -> - {ok, in_progress | undefined | node()} | {error, any()}. -transfer_leadership0([]) -> - rabbit_log:warning("Khepri clustering: failed to transfer leadership, no more candidates available", []), - {error, not_migrated}; -transfer_leadership0([Destination | TransferCandidates]) -> - rabbit_log:info("Khepri clustering: transferring leadership to node ~p", [Destination]), - case ra_leaderboard:lookup_leader(?STORE_ID) of - {Name, Node} = Id when Node == node() -> - Timeout = khepri_app:get_default_timeout(), - case ra:transfer_leadership(Id, {Name, Destination}) of - ok -> - case ra:members(Id, Timeout) of - {_, _, {_, NewNode}} -> - rabbit_log:info("Khepri clustering: successfully transferred leadership to node ~p", [Destination]), - {ok, NewNode}; - {timeout, _} -> - rabbit_log:warning("Khepri clustering: maybe failed to transfer leadership to node ~p, members query has timed out", [Destination]), - {error, not_migrated} - end; - already_leader -> - rabbit_log:info("Khepri clustering: successfully transferred leadership to node ~p, already the leader", [Destination]), - {ok, Destination}; - {error, Reason} -> - rabbit_log:warning("Khepri clustering: failed to transfer leadership to node ~p with the following error ~p", [Destination, Reason]), - transfer_leadership0(TransferCandidates); - {timeout, _} -> - rabbit_log:warning("Khepri clustering: failed to transfer leadership to node ~p with a timeout", [Destination]), - transfer_leadership0(TransferCandidates) - end; - {_, Node} -> - rabbit_log:info("Khepri clustering: skipping leadership transfer, leader is already in node ~p", [Node]), - {ok, Node}; - undefined -> - rabbit_log:info("Khepri clustering: skipping leadership transfer, leader not elected", []), - {ok, undefined} - end. - -%% @private - -status() -> - Nodes = rabbit_nodes:all_running(), - [try - Metrics = get_ra_key_metrics(N), - #{state := RaftState, - membership := Membership, - commit_index := Commit, - term := Term, - last_index := Last, - last_applied := LastApplied, - last_written_index := LastWritten, - snapshot_index := SnapIdx, - machine_version := MacVer} = Metrics, - [{<<"Node Name">>, N}, - {<<"Raft State">>, RaftState}, - {<<"Membership">>, Membership}, - {<<"Last Log Index">>, Last}, - {<<"Last Written">>, LastWritten}, - {<<"Last Applied">>, LastApplied}, - {<<"Commit Index">>, Commit}, - {<<"Snapshot Index">>, SnapIdx}, - {<<"Term">>, Term}, - {<<"Machine Version">>, MacVer} - ] - catch - _:Error -> - [{<<"Node Name">>, N}, - {<<"Raft State">>, Error}, - {<<"Membership">>, <<>>}, - {<<"Last Log Index">>, <<>>}, - {<<"Last Written">>, <<>>}, - {<<"Last Applied">>, <<>>}, - {<<"Commit Index">>, <<>>}, - {<<"Snapshot Index">>, <<>>}, - {<<"Term">>, <<>>}, - {<<"Machine Version">>, <<>>} - ] - end || N <- Nodes]. - -%% @private - -get_ra_key_metrics(Node) -> - ServerId = {?RA_CLUSTER_NAME, Node}, - Metrics0 = ra:key_metrics(ServerId), - MacVer = try - erpc:call(Node, khepri_machine, version, []) - catch - _:{exception, undef, [{khepri_machine, version, _, _} | _]} -> - 0 - end, - Metrics1 = Metrics0#{machine_version => MacVer}, - Metrics1. - -%% @private - -cli_cluster_status() -> - case rabbit:is_running() of - true -> - Nodes = locally_known_nodes(), - [{nodes, [{disc, Nodes}]}, - {running_nodes, [N || N <- Nodes, rabbit_nodes:is_running(N)]}, - {cluster_name, rabbit_nodes:cluster_name()}, - {partitions, []}]; - false -> - [] - end. - %% @private check_cluster_consistency() -> @@ -833,8 +837,13 @@ check_cluster_consistency() -> E end. -nodes_excl_me(Nodes) -> Nodes -- [node()]. - +-spec check_cluster_consistency(Node, CheckNodesConsistency) -> Ret when + Node :: node(), + CheckNodesConsistency :: boolean(), + Ret :: {ok, Status} | {error, any()}, + Status :: {All, Running}, + All :: [node()], + Running :: [node()]. %% @private check_cluster_consistency(Node, CheckNodesConsistency) -> @@ -859,9 +868,33 @@ check_cluster_consistency(Node, CheckNodesConsistency) -> {ok, Status} end. +-spec remote_node_info(Node) -> Info when + Node :: node(), + Info :: {OtpVersion, RabbitMQVersion, ClusterStatus}, + OtpVersion :: string(), + RabbitMQVersion :: string(), + ClusterStatus :: {ok, {All, Running}} | {error, any()}, + All :: [node()], + Running :: [node()]. +%% @private + remote_node_info(Node) -> rpc:call(Node, ?MODULE, node_info, []). +-spec node_info() -> Info when + Info :: {OtpVersion, RabbitMQVersion, ClusterStatus}, + OtpVersion :: string(), + RabbitMQVersion :: string(), + ClusterStatus :: {ok, {All, Running}} | {error, khepri_not_running}, + All :: [node()], + Running :: [node()]. +%% @private + +node_info() -> + {rabbit_misc:otp_release(), + rabbit_misc:version(), + cluster_status_from_khepri()}. + check_nodes_consistency(Node, {RemoteAllNodes, _RemoteRunningNodes}) -> case me_in_nodes(RemoteAllNodes) of true -> @@ -871,44 +904,226 @@ check_nodes_consistency(Node, {RemoteAllNodes, _RemoteRunningNodes}) -> format_inconsistent_cluster_message(node(), Node)}} end. -format_inconsistent_cluster_message(Thinker, Dissident) -> - rabbit_misc:format("Khepri: node ~tp thinks it's clustered " - "with node ~tp, but ~tp disagrees", - [Thinker, Dissident, Dissident]). +format_inconsistent_cluster_message(Thinker, Dissident) -> + rabbit_misc:format("Khepri: node ~tp thinks it's clustered " + "with node ~tp, but ~tp disagrees", + [Thinker, Dissident, Dissident]). + +nodes_excl_me(Nodes) -> Nodes -- [node()]. +me_in_nodes(Nodes) -> lists:member(node(), Nodes). + +-spec cluster_status_from_khepri() -> ClusterStatus when + ClusterStatus :: {ok, {All, Running}} | {error, khepri_not_running}, + All :: [node()], + Running :: [node()]. +%% @private + +cluster_status_from_khepri() -> + try + _ = get_ra_key_metrics(node()), + All = locally_known_nodes(), + Running = lists:filter( + fun(N) -> + rabbit_nodes:is_running(N) + end, All), + {ok, {All, Running}} + catch + _:_ -> + {error, khepri_not_running} + end. + +-spec transfer_leadership(Candidates) -> Ret when + Candidates :: [node()], + Ret :: {ok, Result} | {error, any()}, + Result :: node() | undefined. +%% @private + +transfer_leadership([]) -> + ?LOG_WARNING( + "Skipping leadership transfer of metadata store: no candidate " + "(online, not under maintenance) nodes to transfer to!", + #{domain => ?RMQLOG_DOMAIN_DB}), + {error, no_candidates}; +transfer_leadership(TransferCandidates) -> + case get_feature_state() of + enabled -> + do_transfer_leadership(TransferCandidates); + _ -> + ?LOG_INFO( + "Skipping leadership transfer of metadata store: Khepri " + "is not enabled", + #{domain => ?RMQLOG_DOMAIN_DB}), + {error, khepri_not_enabled} + end. + +do_transfer_leadership([]) -> + ?LOG_WARNING( + "Khepri clustering: failed to transfer leadership, no more " + "candidates available", + #{domain => ?RMQLOG_DOMAIN_DB}), + {error, not_migrated}; +do_transfer_leadership([Destination | TransferCandidates]) -> + ?LOG_INFO( + "Khepri clustering: transferring leadership to node ~p", + [Destination], + #{domain => ?RMQLOG_DOMAIN_DB}), + case ra_leaderboard:lookup_leader(?STORE_ID) of + {Name, Node} = Id when Node == node() -> + Timeout = khepri_app:get_default_timeout(), + case ra:transfer_leadership(Id, {Name, Destination}) of + ok -> + case ra:members(Id, Timeout) of + {_, _, {_, NewNode}} -> + ?LOG_INFO( + "Khepri clustering: successfully " + "transferred leadership to node ~p", + [Destination], + #{domain => ?RMQLOG_DOMAIN_DB}), + {ok, NewNode}; + {timeout, _} -> + ?LOG_WARNING( + "Khepri clustering: maybe failed to transfer " + "leadership to node ~p, members query has " + "timed out", + [Destination], + #{domain => ?RMQLOG_DOMAIN_DB}), + {error, not_migrated} + end; + already_leader -> + ?LOG_INFO( + "Khepri clustering: successfully transferred " + "leadership to node ~p, already the leader", + [Destination], + #{domain => ?RMQLOG_DOMAIN_DB}), + {ok, Destination}; + {error, Reason} -> + ?LOG_WARNING( + "Khepri clustering: failed to transfer leadership " + "to node ~p with the following error ~p", + [Destination, Reason], + #{domain => ?RMQLOG_DOMAIN_DB}), + do_transfer_leadership(TransferCandidates); + {timeout, _} -> + ?LOG_WARNING( + "Khepri clustering: failed to transfer leadership " + "to node ~p with a timeout", + [Destination], + #{domain => ?RMQLOG_DOMAIN_DB}), + do_transfer_leadership(TransferCandidates) + end; + {_, Node} -> + ?LOG_INFO( + "Khepri clustering: skipping leadership transfer, leader is " + "already on node ~p", + [Node], + #{domain => ?RMQLOG_DOMAIN_DB}), + {ok, Node}; + undefined -> + ?LOG_INFO( + "Khepri clustering: skipping leadership transfer, leader " + "not elected", + #{domain => ?RMQLOG_DOMAIN_DB}), + {ok, undefined} + end. + +%% ------------------------------------------------------------------- +%% CLI command support functions. +%% ------------------------------------------------------------------- + +-spec force_shrink_member_to_current_member() -> ok. +%% @doc Shrinks the local Khepri store to be alone in its cluster.d +%% +%% The difference with a reset is that it does not lose its data. +%% +%% This is only used by the CLI's `force_standalone_khepri_boot' command. +%% +%% @private -me_in_nodes(Nodes) -> lists:member(node(), Nodes). +force_shrink_member_to_current_member() -> + ok = ra_server_proc:force_shrink_members_to_current_member( + {?RA_CLUSTER_NAME, node()}). +-spec status() -> Status when + Status :: [Metrics], + Metrics :: [{Key, Value}], + Key :: binary(), + Value :: any(). %% @private -node_info() -> - {rabbit_misc:otp_release(), - rabbit_misc:version(), - cluster_status_from_khepri()}. +status() -> + Nodes = rabbit_nodes:all_running(), + [try + Metrics = get_ra_key_metrics(N), + #{state := RaftState, + membership := Membership, + commit_index := Commit, + term := Term, + last_index := Last, + last_applied := LastApplied, + last_written_index := LastWritten, + snapshot_index := SnapIdx, + machine_version := MacVer} = Metrics, + [{<<"Node Name">>, N}, + {<<"Raft State">>, RaftState}, + {<<"Membership">>, Membership}, + {<<"Last Log Index">>, Last}, + {<<"Last Written">>, LastWritten}, + {<<"Last Applied">>, LastApplied}, + {<<"Commit Index">>, Commit}, + {<<"Snapshot Index">>, SnapIdx}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MacVer} + ] + catch + _:Error -> + [{<<"Node Name">>, N}, + {<<"Raft State">>, Error}, + {<<"Membership">>, <<>>}, + {<<"Last Log Index">>, <<>>}, + {<<"Last Written">>, <<>>}, + {<<"Last Applied">>, <<>>}, + {<<"Commit Index">>, <<>>}, + {<<"Snapshot Index">>, <<>>}, + {<<"Term">>, <<>>}, + {<<"Machine Version">>, <<>>} + ] + end || N <- Nodes]. +-spec get_ra_key_metrics(Node) -> Metrics when + Node :: node(), + Metrics :: map(). %% @private -cluster_status_from_khepri() -> - try - _ = get_ra_key_metrics(node()), - All = locally_known_nodes(), - Running = lists:filter( - fun(N) -> - rabbit_nodes:is_running(N) - end, All), - {ok, {All, Running}} - catch - _:_ -> - {error, khepri_not_running} - end. +get_ra_key_metrics(Node) -> + ServerId = {?RA_CLUSTER_NAME, Node}, + Metrics0 = ra:key_metrics(ServerId), + MacVer = try + erpc:call(Node, khepri_machine, version, []) + catch + _:{exception, undef, [{khepri_machine, version, _, _} | _]} -> + 0 + end, + Metrics1 = Metrics0#{machine_version => MacVer}, + Metrics1. --spec root_path() -> RootPath when - RootPath :: khepri_path:path(). -%% @doc Returns the path where RabbitMQ stores every metadata. -%% -%% This path must be prepended to all paths used by RabbitMQ subsystems. +-spec cli_cluster_status() -> Status when + Status :: [{nodes, [{disc, [node()]}]} | + {running_nodes, [node()]} | + {cluster_name, binary()} | + {partitions, []}]. +%% @private -root_path() -> - ?RABBITMQ_KHEPRI_ROOT_PATH. +cli_cluster_status() -> + case rabbit:is_running() of + true -> + Nodes = locally_known_nodes(), + [{nodes, [{disc, Nodes}]}, + {running_nodes, [N || N <- Nodes, rabbit_nodes:is_running(N)]}, + {cluster_name, rabbit_nodes:cluster_name()}, + {partitions, []}]; + false -> + [] + end. %% ------------------------------------------------------------------- %% "Proxy" functions to Khepri API. @@ -922,127 +1137,136 @@ root_path() -> %% They are some additional functions too, because they are useful in %% RabbitMQ. They might be moved to Khepri in the future. -is_empty() -> khepri:is_empty(?STORE_ID). +is_empty() -> + khepri:is_empty(?STORE_ID). + +get(PathPattern) -> + khepri:get(?STORE_ID, PathPattern). + +get(PathPattern, Options) -> + khepri:get(?STORE_ID, PathPattern, Options). -create(Path, Data) -> - khepri:create(?STORE_ID, Path, Data, ?DEFAULT_COMMAND_OPTIONS). -adv_create(Path, Data) -> adv_create(Path, Data, #{}). -adv_create(Path, Data, Options0) -> - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0), - khepri_adv:create(?STORE_ID, Path, Data, Options). -update(Path, Data) -> - khepri:update(?STORE_ID, Path, Data, ?DEFAULT_COMMAND_OPTIONS). -cas(Path, Pattern, Data) -> - khepri:compare_and_swap( - ?STORE_ID, Path, Pattern, Data, ?DEFAULT_COMMAND_OPTIONS). +adv_get(PathPattern) -> + khepri_adv:get(?STORE_ID, PathPattern). + +adv_get(PathPattern, Options) -> + khepri_adv:get(?STORE_ID, PathPattern, Options). + +get_many(PathPattern) -> + khepri:get_many(?STORE_ID, PathPattern). -fold(Path, Pred, Acc) -> - khepri:fold(?STORE_ID, Path, Pred, Acc). +get_many(PathPattern, Options) -> + khepri:get_many(?STORE_ID, PathPattern, Options). -fold(Path, Pred, Acc, Options) -> - khepri:fold(?STORE_ID, Path, Pred, Acc, Options). +adv_get_many(PathPattern) -> + khepri_adv:get_many(?STORE_ID, PathPattern). -foreach(Path, Pred) -> - khepri:foreach(?STORE_ID, Path, Pred). +adv_get_many(PathPattern, Options) -> + khepri_adv:get_many(?STORE_ID, PathPattern, Options). -filter(Path, Pred) -> - khepri:filter(?STORE_ID, Path, Pred). +exists(PathPattern) -> + khepri:exists(?STORE_ID, PathPattern). -get(Path) -> - khepri:get(?STORE_ID, Path). +exists(PathPattern, Options) -> + khepri:exists(?STORE_ID, PathPattern, Options). -get(Path, Options) -> - khepri:get(?STORE_ID, Path, Options). +%% `count/{1,2}' sets the `favor => low_latency' option. count(PathPattern) -> - khepri:count(?STORE_ID, PathPattern, #{favor => low_latency}). + count(PathPattern, #{}). -count(Path, Options) -> +count(PathPattern, Options) -> Options1 = Options#{favor => low_latency}, - khepri:count(?STORE_ID, Path, Options1). + khepri:count(?STORE_ID, PathPattern, Options1). -get_many(PathPattern) -> - khepri:get_many(?STORE_ID, PathPattern). +fold(PathPattern, Pred, Acc) -> + khepri:fold(?STORE_ID, PathPattern, Pred, Acc). -adv_get(Path) -> - khepri_adv:get(?STORE_ID, Path). +fold(PathPattern, Pred, Acc, Options) -> + khepri:fold(?STORE_ID, PathPattern, Pred, Acc, Options). -adv_get_many(PathPattern) -> - khepri_adv:get_many(?STORE_ID, PathPattern). +foreach(PathPattern, Pred) -> + khepri:foreach(?STORE_ID, PathPattern, Pred). -match(Path) -> - match(Path, #{}). +foreach(PathPattern, Pred, Options) -> + khepri:foreach(?STORE_ID, PathPattern, Pred, Options). -match(Path, Options) -> - khepri:get_many(?STORE_ID, Path, Options). +map(PathPattern, Pred) -> + khepri:map(?STORE_ID, PathPattern, Pred). -exists(Path) -> khepri:exists(?STORE_ID, Path). +map(PathPattern, Pred, Options) -> + khepri:map(?STORE_ID, PathPattern, Pred, Options). -list(Path) -> - khepri:get_many( - ?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR]). +filter(PathPattern, Pred) -> + khepri:filter(?STORE_ID, PathPattern, Pred). -list_child_nodes(Path) -> - Options = #{props_to_return => [child_names]}, - case khepri_adv:get_many(?STORE_ID, Path, Options) of - {ok, Result} -> - case maps:values(Result) of - [#{child_names := ChildNames}] -> - {ok, ChildNames}; - [] -> - [] - end; - Error -> - Error - end. +filter(PathPattern, Pred, Options) -> + khepri:filter(?STORE_ID, PathPattern, Pred, Options). -count_children(Path) -> - Options = #{props_to_return => [child_list_length]}, - case khepri_adv:get_many(?STORE_ID, Path, Options) of - {ok, Map} -> - lists:sum([L || #{child_list_length := L} <- maps:values(Map)]); - _ -> - 0 - end. +put(PathPattern, Data) -> + put(PathPattern, Data, #{}). -clear_payload(Path) -> - khepri:clear_payload(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS). +put(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:put(?STORE_ID, PathPattern, Data, Options1). -delete(Path) -> - khepri:delete_many(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS). +adv_put(PathPattern, Data) -> + adv_put(PathPattern, Data, #{}). -delete(Path, Options0) -> - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0), - khepri:delete_many(?STORE_ID, Path, Options). +adv_put(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:put(?STORE_ID, PathPattern, Data, Options1). -delete_or_fail(Path) -> - case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of - {ok, #{Path := NodeProps}} -> - case maps:size(NodeProps) of - 0 -> {error, {node_not_found, #{}}}; - _ -> ok - end; - {ok, #{} = NodePropsMap} when NodePropsMap =:= #{} -> - {error, {node_not_found, #{}}}; - {error, _} = Error -> - Error - end. +create(PathPattern, Data) -> + create(PathPattern, Data, #{}). + +create(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:create(?STORE_ID, PathPattern, Data, Options1). -adv_delete_many(Path) -> - khepri_adv:delete_many(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS). +adv_create(PathPattern, Data) -> + adv_create(PathPattern, Data, #{}). -put(PathPattern, Data) -> - khepri:put( - ?STORE_ID, PathPattern, Data, ?DEFAULT_COMMAND_OPTIONS). +adv_create(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:create(?STORE_ID, PathPattern, Data, Options1). -put(PathPattern, Data, Options0) -> - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0), - khepri:put( - ?STORE_ID, PathPattern, Data, Options). +update(PathPattern, Data) -> + update(PathPattern, Data, #{}). -adv_put(PathPattern, Data) -> - khepri_adv:put( - ?STORE_ID, PathPattern, Data, ?DEFAULT_COMMAND_OPTIONS). +update(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:update(?STORE_ID, PathPattern, Data, Options1). + +adv_update(PathPattern, Data) -> + adv_update(PathPattern, Data, #{}). + +adv_update(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:update(?STORE_ID, PathPattern, Data, Options1). + +%% `delete/{1,2}' calls `khepri:delete_many/2. + +delete(PathPattern) -> + delete(PathPattern, #{}). + +delete(PathPattern, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:delete_many(?STORE_ID, PathPattern, Options1). + +adv_delete(PathPattern) -> + adv_delete(PathPattern, #{}). + +adv_delete(PathPattern, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:delete_many(?STORE_ID, PathPattern, Options1). + +clear_payload(PathPattern) -> + clear_payload(PathPattern, #{}). + +clear_payload(PathPattern, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:clear_payload(?STORE_ID, PathPattern, Options1). transaction(Fun) -> transaction(Fun, auto, #{}). @@ -1053,53 +1277,36 @@ transaction(Fun, ReadWrite) -> transaction(Fun, ReadWrite, Options) -> Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of - ok -> ok; - {ok, Result} -> Result; + ok -> ok; %% Async transaction. + {ok, Result} -> Result; {error, Reason} -> throw({error, Reason}) end. -clear_store() -> - khepri:delete_many(?STORE_ID, "*", ?DEFAULT_COMMAND_OPTIONS). - -info() -> - ok = setup(), - khepri:info(?STORE_ID). +fence(Timeout) -> + khepri:fence(?STORE_ID, Timeout). handle_async_ret(RaEvent) -> khepri:handle_async_ret(?STORE_ID, RaEvent). -fence(Timeout) -> - khepri:fence(?STORE_ID, Timeout). +%% `delete_or_fail/1' is not a proxy to a Khepri function. --spec unregister_legacy_projections() -> Ret when - Ret :: ok | timeout_error(). -%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x -%% versions. -%% -%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if -%% Khepri was not enabled. This function is used by the `khepri_db' enable -%% callback to remove those projections before we register the ones necessary -%% for 4.0.x. -%% -%% @private +delete_or_fail(Path) -> + %% `Path' must not be a pattern. + case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of + {ok, #{Path := NodeProps}} -> + case maps:size(NodeProps) of + 0 -> {error, {node_not_found, #{}}}; + _ -> ok + end; + {ok, #{} = NodePropsMap} when NodePropsMap =:= #{} -> + {error, {node_not_found, #{}}}; + {error, _} = Error -> + Error + end. -unregister_legacy_projections() -> - %% Note that we don't use `all' since `khepri_mnesia_migration' also - %% creates a projection table which we don't want to unregister. Instead - %% we list all of the legacy projection names: - LegacyNames = [ - rabbit_khepri_exchange, - rabbit_khepri_queue, - rabbit_khepri_vhost, - rabbit_khepri_users, - rabbit_khepri_global_rtparams, - rabbit_khepri_per_vhost_rtparams, - rabbit_khepri_user_permissions, - rabbit_khepri_bindings, - rabbit_khepri_index_route, - rabbit_khepri_topic_trie - ], - khepri:unregister_projections(?STORE_ID, LegacyNames). +%% ------------------------------------------------------------------- +%% Projections setup. +%% ------------------------------------------------------------------- register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, @@ -1421,33 +1628,6 @@ follow_down_update(Table, Exchange, LeafNodeId, [], UpdateFn) -> keep end. -retry_khepri_op(Fun, 0) -> - Fun(); -retry_khepri_op(Fun, N) -> - case Fun() of - {error, {no_more_servers_to_try, Reasons}} = Err -> - case lists:member({error,cluster_change_not_permitted}, Reasons) of - true -> - timer:sleep(1000), - retry_khepri_op(Fun, N - 1); - false -> - Err - end; - {no_more_servers_to_try, Reasons} = Err -> - case lists:member({error,cluster_change_not_permitted}, Reasons) of - true -> - timer:sleep(1000), - retry_khepri_op(Fun, N - 1); - false -> - Err - end; - {error, cluster_change_not_permitted} -> - timer:sleep(1000), - retry_khepri_op(Fun, N - 1); - Any -> - Any - end. - %% ------------------------------------------------------------------- %% Mnesia->Khepri migration code. %% ------------------------------------------------------------------- @@ -1580,7 +1760,7 @@ khepri_db_migration_post_enable( FeatureName :: rabbit_feature_flags:feature_name(), Ret :: ok | {error, Reason}, Reason :: any(). -%% @doc Initializes the Khepri cluster based on the Mnesia cluster. +%% @doc Initialises the Khepri cluster based on the Mnesia cluster. %% %% It uses the `khepri_mnesia_migration' application to synchronize membership %% between both cluster. @@ -1754,6 +1934,36 @@ do_migrate_mnesia_tables(FeatureName, Migrations) -> {error, {migration_failure, Error}} end. +-spec unregister_legacy_projections() -> Ret when + Ret :: ok | timeout_error(). +%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x +%% versions. +%% +%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if +%% Khepri was not enabled. This function is used by the `khepri_db' enable +%% callback to remove those projections before we register the ones necessary +%% for 4.0.x. +%% +%% @private + +unregister_legacy_projections() -> + %% Note that we don't use `all' since `khepri_mnesia_migration' also + %% creates a projection table which we don't want to unregister. Instead + %% we list all of the legacy projection names: + LegacyNames = [ + rabbit_khepri_exchange, + rabbit_khepri_queue, + rabbit_khepri_vhost, + rabbit_khepri_users, + rabbit_khepri_global_rtparams, + rabbit_khepri_per_vhost_rtparams, + rabbit_khepri_user_permissions, + rabbit_khepri_bindings, + rabbit_khepri_index_route, + rabbit_khepri_topic_trie + ], + khepri:unregister_projections(?STORE_ID, LegacyNames). + -spec handle_fallback(Funs) -> Ret when Funs :: #{mnesia := Fun, khepri := Fun | Ret}, Fun :: fun(() -> Ret), diff --git a/deps/rabbit/test/metadata_store_phase1_SUITE.erl b/deps/rabbit/test/metadata_store_phase1_SUITE.erl index 051e2c9c5d6d..becc8990fac8 100644 --- a/deps/rabbit/test/metadata_store_phase1_SUITE.erl +++ b/deps/rabbit/test/metadata_store_phase1_SUITE.erl @@ -272,7 +272,7 @@ end_per_testcase(Testcase, Config) -> TableDefs), %% Clear all data in Khepri. - ok = rabbit_khepri:clear_store(), + ok = rabbit_khepri:delete("*"), rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -2719,4 +2719,4 @@ check_storage(khepri, none, Content) -> rabbit_khepri:info(), Path = [#if_all{conditions = [?KHEPRI_WILDCARD_STAR_STAR, #if_has_data{}]}], - ?assertEqual({ok, Content}, rabbit_khepri:match(Path)). + ?assertEqual({ok, Content}, rabbit_khepri:get_many(Path)). diff --git a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl index 5c3221febc0d..b7e0911f9832 100644 --- a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl +++ b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl @@ -265,6 +265,6 @@ child_id_format(Config) -> ?assertMatch( {ok, #{Path := _}}, rabbit_ct_broker_helpers:rpc( - Config, NewNode, rabbit_khepri, list, - [Pattern])) + Config, NewNode, rabbit_khepri, get_many, + [Pattern ++ [?KHEPRI_WILDCARD_STAR]])) end.