From 868b41e764dec90b14c064857c14fb90153bcacb Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Wed, 4 Sep 2024 12:10:48 -0400 Subject: [PATCH 01/15] rabbit_db_binding: Prefer khepri_tx_adv:delete_many/1 to delete bindings Currently we use a combination of `khepri_tx:get_many/1` and then either `khepri_tx:delete/1` or `khepri_tx:delete_many/1`. This isn't a functional change: switching to `khepri_tx_adv:delete_many/1` is essentially equivalent but performs the deletion and lookup all in one command and one traversal of the tree. This should improve performance when deleting many bindings in an exchange. --- deps/rabbit/src/rabbit_db_binding.erl | 28 +++++++++++---------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index d2cece80fabc..4b3fd102405e 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -835,9 +835,8 @@ delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = #if_has_data{}), - {ok, Bindings} = khepri_tx:get_many(Path), - ok = khepri_tx:delete_many(Path), - maps:fold(fun(_P, Set, Acc) -> + {ok, Bindings} = khepri_tx_adv:delete_many(Path), + maps:fold(fun(_P, #{data := Set}, Acc) -> sets:to_list(Set) ++ Acc end, [], Bindings). @@ -881,25 +880,20 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) -> OnlyDurable :: boolean(), Deletions :: rabbit_binding:deletions(). -delete_for_destination_in_khepri(DstName, OnlyDurable) -> - BindingsMap = match_destination_in_khepri(DstName), - maps:foreach(fun(K, _V) -> khepri_tx:delete(K) end, BindingsMap), - Bindings = maps:fold(fun(_, Set, Acc) -> +delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) -> + Pattern = khepri_route_path( + VHost, + _SrcName = ?KHEPRI_WILDCARD_STAR, + Kind, + Name, + _RoutingKey = ?KHEPRI_WILDCARD_STAR), + {ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern), + Bindings = maps:fold(fun(_, #{data := Set}, Acc) -> sets:to_list(Set) ++ Acc end, [], BindingsMap), rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4, lists:keysort(#binding.source, Bindings), OnlyDurable). -match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}) -> - Path = khepri_route_path( - VHost, - _SrcName = ?KHEPRI_WILDCARD_STAR, - Kind, - Name, - _RoutingKey = ?KHEPRI_WILDCARD_STAR), - {ok, Map} = khepri_tx:get_many(Path), - Map. - %% ------------------------------------------------------------------- %% delete_transient_for_destination_in_mnesia(). %% ------------------------------------------------------------------- From d19b9b6337639c69f066be6a7335b6a02660bc96 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Wed, 4 Sep 2024 12:13:19 -0400 Subject: [PATCH 02/15] Transactionally delete all exchanges during vhost deletion Currently we delete each exchange one-by-one which requires three commands: the delete itself plus a put and delete for a runtime parameter that acts as a lock to prevent a client from declaring an exchange while it's being deleted. The lock is unnecessary during vhost deletion because permissions are cleared for the vhost before any resources are deleted. We can use a transaction to delete all exchanges and bindings for a vhost in a single command against the Khepri store. This minimizes the number of commands we need to send against the store and therefore the latency of the deletion. In a quick test with a vhost containing only 10,000 exchanges (no bindings, queues, users, etc.), this is an order of magnitude speedup: the prior commit takes 22s to delete the vhost while with this commit the vhost is deleted in 2s. --- deps/rabbit/src/rabbit_db_exchange.erl | 64 ++++++++++++++++++++++++++ deps/rabbit/src/rabbit_exchange.erl | 13 +++++- deps/rabbit/src/rabbit_vhost.erl | 3 +- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 326534385bc5..f8c37a22428f 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -26,6 +26,7 @@ peek_serial/1, next_serial/1, delete/2, + delete_all/1, delete_serial/1, recover/1, match/1, @@ -657,6 +658,69 @@ delete_in_khepri(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour ok = khepri_tx:delete(khepri_exchange_path(XName)), rabbit_db_binding:delete_all_for_exchange_in_khepri(X, OnlyDurable, RemoveBindingsForSource). +%% ------------------------------------------------------------------- +%% delete_all(). +%% ------------------------------------------------------------------- + +-spec delete_all(VHostName) -> Ret when + VHostName :: vhost:name(), + Deletions :: rabbit_binding:deletions(), + Ret :: {ok, Deletions}. +%% @doc Deletes all exchanges for a given vhost. +%% +%% @returns an `{ok, Deletions}' tuple containing the {@link +%% rabbit_binding:deletions()} caused by deleting the exchanges under the given +%% vhost. +%% +%% @private + +delete_all(VHostName) -> + rabbit_khepri:handle_fallback( + #{mnesia => fun() -> delete_all_in_mnesia(VHostName) end, + khepri => fun() -> delete_all_in_khepri(VHostName) end + }). + +delete_all_in_mnesia(VHostName) -> + rabbit_mnesia:execute_mnesia_transaction( + fun() -> + delete_all_in_mnesia_tx(VHostName) + end). + +delete_all_in_mnesia_tx(VHostName) -> + Match = #exchange{name = rabbit_misc:r(VHostName, exchange), _ = '_'}, + Xs = mnesia:match_object(?MNESIA_TABLE, Match, write), + Deletions = + lists:foldl( + fun(X, Acc) -> + {deleted, #exchange{name = XName}, Bindings, XDeletions} = + unconditional_delete_in_mnesia( X, false), + XDeletions1 = rabbit_binding:add_deletion( + XName, {X, deleted, Bindings}, XDeletions), + rabbit_binding:combine_deletions(Acc, XDeletions1) + end, rabbit_binding:new_deletions(), Xs), + {ok, Deletions}. + +delete_all_in_khepri(VHostName) -> + rabbit_khepri:transaction( + fun() -> + delete_all_in_khepri_tx(VHostName) + end, rw, #{timeout => infinity}). + +delete_all_in_khepri_tx(VHostName) -> + Pattern = khepri_exchange_path(VHostName, ?KHEPRI_WILDCARD_STAR), + {ok, NodeProps} = khepri_tx_adv:delete_many(Pattern), + Deletions = + maps:fold( + fun(_Path, #{data := X}, Deletions) -> + {deleted, #exchange{name = XName}, Bindings, XDeletions} = + rabbit_db_binding:delete_all_for_exchange_in_khepri( + X, false, true), + Deletions1 = rabbit_binding:add_deletion( + XName, {X, deleted, Bindings}, XDeletions), + rabbit_binding:combine_deletions(Deletions, Deletions1) + end, rabbit_binding:new_deletions(), NodeProps), + {ok, Deletions}. + %% ------------------------------------------------------------------- %% delete_serial(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 5a00d4de80da..b4037f9a8078 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -14,7 +14,7 @@ update_scratch/3, update_decorators/2, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, route/2, route/3, delete/3, validate_binding/2, count/0, - ensure_deleted/3]). + ensure_deleted/3, delete_all/2]). -export([list_names/0]). -export([serialise_events/1]). -export([serial/1, peek_serial/1]). @@ -484,6 +484,17 @@ delete(XName, IfUnused, Username) -> XName#resource.name, Username) end. +-spec delete_all(VHostName, ActingUser) -> Ret when + VHostName :: vhost:name(), + ActingUser :: rabbit_types:username(), + Ret :: ok. + +delete_all(VHostName, ActingUser) -> + {ok, Deletions} = rabbit_db_exchange:delete_all(VHostName), + Deletions1 = rabbit_binding:process_deletions(Deletions), + rabbit_binding:notify_deletions(Deletions1, ActingUser), + ok. + process_deletions({error, _} = E) -> E; process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) -> diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 4da8fe1d6785..00c148e275ea 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -299,8 +299,7 @@ delete(VHost, ActingUser) -> assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) end || Q <- rabbit_amqqueue:list(VHost)], rabbit_log:info("Deleting exchanges in vhost '~ts' because it's being deleted", [VHost]), - [ok = rabbit_exchange:ensure_deleted(Name, false, ActingUser) || - #exchange{name = Name} <- rabbit_exchange:list(VHost)], + ok = rabbit_exchange:delete_all(VHost, ActingUser), rabbit_log:info("Clearing policies and runtime parameters in vhost '~ts' because it's being deleted", [VHost]), _ = rabbit_runtime_parameters:clear_vhost(VHost, ActingUser), rabbit_log:debug("Removing vhost '~ts' from the metadata storage because it's being deleted", [VHost]), From 0c44d31cf9ce87166f73aa26e1aee8fd9e491030 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Wed, 4 Sep 2024 16:06:28 -0400 Subject: [PATCH 03/15] rabbitmq_event_exchange: Test for parameters in exchange deletion With the change in the parent commit we no longer set and clear a runtime parameter when deleting an exchange as part of vhost deletion. We need to adapt the `audit_vhost_internal_parameter` test case to test that the parameter is set and cleared when the exchange is deleted instead. --- .../test/system_SUITE.erl | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/deps/rabbitmq_event_exchange/test/system_SUITE.erl b/deps/rabbitmq_event_exchange/test/system_SUITE.erl index 3cd01a79e852..76d9199a586c 100644 --- a/deps/rabbitmq_event_exchange/test/system_SUITE.erl +++ b/deps/rabbitmq_event_exchange/test/system_SUITE.erl @@ -21,6 +21,7 @@ all() -> authentication, audit_queue, audit_exchange, + audit_exchange_internal_parameter, audit_binding, audit_vhost, audit_vhost_deletion, @@ -28,7 +29,6 @@ all() -> audit_connection, audit_direct_connection, audit_consumer, - audit_vhost_internal_parameter, audit_parameter, audit_policy, audit_vhost_limit, @@ -272,13 +272,19 @@ audit_consumer(Config) -> rabbit_ct_client_helpers:close_channel(Ch), ok. -audit_vhost_internal_parameter(Config) -> +audit_exchange_internal_parameter(Config) -> Ch = declare_event_queue(Config, <<"parameter.*">>), - User = <<"Bugs Bunny">>, - Vhost = <<"test-vhost">>, - rabbit_ct_broker_helpers:add_vhost(Config, 0, Vhost, User), - rabbit_ct_broker_helpers:delete_vhost(Config, 0, Vhost, User), + X = <<"exchange.audited-for-parameters">>, + #'exchange.declare_ok'{} = + amqp_channel:call(Ch, #'exchange.declare'{exchange = X, + type = <<"topic">>}), + #'exchange.delete_ok'{} = + amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), + + User = proplists:get_value(rmq_username, Config), + %% Exchange deletion sets and clears a runtime parameter which acts as a + %% kind of lock: receive_user_in_event(<<"parameter.set">>, User), receive_user_in_event(<<"parameter.cleared">>, User), From ffec9f74ed07c89d0039e0be7171c1ce4e056b7f Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:42:32 -0400 Subject: [PATCH 04/15] rabbit_khepri: Avoid throws in `register_projection/0` Previously this function threw errors. With this minor refactor we return them instead so that `register_projection/0` is easier for callers to work with. (In the child commit we will add another caller.) --- deps/rabbit/src/rabbit_khepri.erl | 39 +++++++++++++++++-------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 913b4de80d5f..45eff76c088c 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -301,10 +301,10 @@ wait_for_register_projections(_Timeout, 0) -> wait_for_register_projections(Timeout, Retries) -> rabbit_log:info("Waiting for Khepri projections for ~tp ms, ~tp retries left", [Timeout, Retries - 1]), - try - register_projections() - catch - throw : timeout -> + case register_projections() of + ok -> + ok; + {error, timeout} -> wait_for_register_projections(Timeout, Retries -1) end. @@ -1116,20 +1116,23 @@ register_projections() -> fun register_rabbit_bindings_projection/0, fun register_rabbit_index_route_projection/0, fun register_rabbit_topic_graph_projection/0], - [case RegisterFun() of - ok -> - ok; - %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` would - %% return `{error, exists}` for projections which already exist. - {error, exists} -> - ok; - %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. - {error, {khepri, projection_already_exists, _Info}} -> - ok; - {error, Error} -> - throw(Error) - end || RegisterFun <- RegFuns], - ok. + rabbit_misc:for_each_while_ok( + fun(RegisterFun) -> + case RegisterFun() of + ok -> + ok; + %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` + %% would return `{error, exists}` for projections which + %% already exist. + {error, exists} -> + ok; + %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. + {error, {khepri, projection_already_exists, _Info}} -> + ok; + {error, _} = Error -> + Error + end + end, RegFuns). register_rabbit_exchange_projection() -> Name = rabbit_khepri_exchange, From a7712633c6c80234f4c7c66998486cc289a133fa Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:46:00 -0400 Subject: [PATCH 05/15] rabbit_khepri: Add projection registration to khepri_db ff enable fun --- deps/rabbit/src/rabbit_khepri.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 45eff76c088c..a719e3ba5dec 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -87,6 +87,8 @@ -module(rabbit_khepri). +-feature(maybe_expr, enable). + -include_lib("kernel/include/logger.hrl"). -include_lib("stdlib/include/assert.hrl"). @@ -1518,9 +1520,10 @@ get_feature_state(Node) -> %% @private khepri_db_migration_enable(#{feature_name := FeatureName}) -> - case sync_cluster_membership_from_mnesia(FeatureName) of - ok -> migrate_mnesia_tables(FeatureName); - Error -> Error + maybe + ok ?= sync_cluster_membership_from_mnesia(FeatureName), + ok ?= register_projections(), + migrate_mnesia_tables(FeatureName) end. %% @private From 3860c1e3f820f015059e88cdee021d2f1f1e70f3 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:46:23 -0400 Subject: [PATCH 06/15] rabbit_khepri: Unregister all projections when enabling khepri_db ff --- deps/rabbit/src/rabbit_khepri.erl | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index a719e3ba5dec..e6f939421dc1 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1107,6 +1107,27 @@ collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) -> Acc end, Acc0, Props). +-spec unregister_all_projections() -> Ret when + Ret :: ok | timeout_error(). + +unregister_all_projections() -> + %% Note that we don't use `all' since `khepri_mnesia_migration' also + %% creates a projection table which we don't want to unregister. Instead + %% we list all of the currently used projection names: + Names = [ + rabbit_khepri_exchange, + rabbit_khepri_queue, + rabbit_khepri_vhost, + rabbit_khepri_users, + rabbit_khepri_global_rtparams, + rabbit_khepri_per_vhost_rtparams, + rabbit_khepri_user_permissions, + rabbit_khepri_bindings, + rabbit_khepri_index_route, + rabbit_khepri_topic_trie + ], + khepri:unregister_projections(?STORE_ID, Names). + register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, fun register_rabbit_queue_projection/0, @@ -1522,6 +1543,7 @@ get_feature_state(Node) -> khepri_db_migration_enable(#{feature_name := FeatureName}) -> maybe ok ?= sync_cluster_membership_from_mnesia(FeatureName), + ok ?= unregister_all_projections(), ok ?= register_projections(), migrate_mnesia_tables(FeatureName) end. From 3e42bd57d2a862ef30b501de0f4e3d951677a342 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:47:14 -0400 Subject: [PATCH 07/15] rabbit_khepri: Remove projection registration from setup/0 --- deps/rabbit/src/rabbit_khepri.erl | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index e6f939421dc1..21db66cdaa3a 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -269,7 +269,6 @@ setup(_) -> RetryTimeout = retry_timeout(), case khepri_cluster:wait_for_leader(?STORE_ID, RetryTimeout) of ok -> - wait_for_register_projections(), ?LOG_DEBUG( "Khepri-based " ?RA_FRIENDLY_NAME " ready", #{domain => ?RMQLOG_DOMAIN_GLOBAL}), @@ -289,27 +288,6 @@ retry_timeout() -> undefined -> 30000 end. -retry_limit() -> - case application:get_env(rabbit, khepri_leader_wait_retry_limit) of - {ok, T} -> T; - undefined -> 10 - end. - -wait_for_register_projections() -> - wait_for_register_projections(retry_timeout(), retry_limit()). - -wait_for_register_projections(_Timeout, 0) -> - exit(timeout_waiting_for_khepri_projections); -wait_for_register_projections(Timeout, Retries) -> - rabbit_log:info("Waiting for Khepri projections for ~tp ms, ~tp retries left", - [Timeout, Retries - 1]), - case register_projections() of - ok -> - ok; - {error, timeout} -> - wait_for_register_projections(Timeout, Retries -1) - end. - %% @private -spec init() -> Ret when From 7c8cc61c3f8ee7610b6f867cfe6c2bd291be74f1 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:48:18 -0400 Subject: [PATCH 08/15] rabbit_khepri: Use `?STORE_ID` for projection registration functions This is a cosmetic change. `?RA_CLUSTER_NAME` is equivalent but is used for clustering commands. Commands sent via the `khepri`/`khepri_adv` APIs consistently use the `?STORE_ID` macro instead. --- deps/rabbit/src/rabbit_khepri.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 21db66cdaa3a..6fb4875edbbd 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1192,7 +1192,7 @@ register_rabbit_user_permissions_projection() -> register_simple_projection(Name, PathPattern, KeyPos) -> Options = #{keypos => KeyPos}, Projection = khepri_projection:new(Name, copy, Options), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_bindings_projection() -> MapFun = fun(_Path, Binding) -> @@ -1208,7 +1208,7 @@ register_rabbit_bindings_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_index_route_projection() -> MapFun = fun(Path, _) -> @@ -1240,7 +1240,7 @@ register_rabbit_index_route_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). %% Routing information is stored in the Khepri store as a `set'. %% In order to turn these bindings into records in an ETS `bag', we use a @@ -1341,7 +1341,7 @@ register_rabbit_topic_graph_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). -spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when Table :: ets:tid(), From 564a562a078a5b0cdecded1f165efd61b9ac4aa3 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:49:38 -0400 Subject: [PATCH 09/15] Explicitly match skips when setting metadata store in CT This causes a clearer error when the `enable_feature_flags/2` function returns something not in the shape `ok | {skip, any()}`. --- deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index c230b63cf3a5..f3d556595802 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -992,7 +992,7 @@ enable_khepri_metadata_store(Config, FFs0) -> case enable_feature_flag(C, FF) of ok -> C; - Skip -> + {skip, _} = Skip -> ct:pal("Enabling metadata store failed: ~p", [Skip]), Skip end From c91c951344d2aa4118cd1866f1081c9ec3df0349 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:51:07 -0400 Subject: [PATCH 10/15] Ensure projections are registered in metadata_store_phase1_SUITE --- deps/rabbit/src/rabbit_khepri.erl | 1 + deps/rabbit/test/metadata_store_phase1_SUITE.erl | 1 + 2 files changed, 2 insertions(+) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 6fb4875edbbd..d2bbd53a7cff 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -100,6 +100,7 @@ -export([setup/0, setup/1, + register_projections/0, init/0, can_join_cluster/1, add_member/2, diff --git a/deps/rabbit/test/metadata_store_phase1_SUITE.erl b/deps/rabbit/test/metadata_store_phase1_SUITE.erl index 7e50445820f0..cf080d170ce1 100644 --- a/deps/rabbit/test/metadata_store_phase1_SUITE.erl +++ b/deps/rabbit/test/metadata_store_phase1_SUITE.erl @@ -192,6 +192,7 @@ setup_khepri(Config) -> %% Configure Khepri. It takes care of configuring Ra system & cluster. It %% uses the Mnesia directory to store files. ok = rabbit_khepri:setup(undefined), + ok = rabbit_khepri:register_projections(), ct:pal("Khepri info below:"), rabbit_khepri:info(), From 711af29c2446d40e29c0e166fc5b017bb3b52b21 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 09:55:34 -0400 Subject: [PATCH 11/15] rabbit_khepri: Register projections during virgin `init/1` This covers a specific case where we need to register projections not covered by the enable callback of the `khepri_db` feature flag. The feature flag may be enabled if a node has been part of a cluster which enabled the flag, but the metadata store might be reset. Upon init the feature flag will be enabled but the store will be empty and the projections will not exist, so operations like inserting default data will fail when asserting that a vhost exists for example. This fixes the `cluster_management_SUITE:forget_cluster_node_in_khepri/1` case when running the suite with `RABBITMQ_METADATA_STORE=khepri`, which fails as mentioned above. We could run projection registration always when using Khepri but once projections are registered the command is idempotent so there's no need to, and the commands are somewhat large. --- deps/rabbit/src/rabbit_db.erl | 10 ++++----- deps/rabbit/src/rabbit_khepri.erl | 36 ++++++++++++++++++++----------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index faa4dd28e6b3..6dd2ae7d01cf 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -67,8 +67,8 @@ init() -> end, Ret = case rabbit_khepri:is_enabled() of - true -> init_using_khepri(); - false -> init_using_mnesia() + true -> init_using_khepri(IsVirgin); + false -> init_using_mnesia(IsVirgin) end, case Ret of ok -> @@ -91,7 +91,7 @@ pre_init(IsVirgin) -> OtherMembers = rabbit_nodes:nodes_excl_me(Members), rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin). -init_using_mnesia() -> +init_using_mnesia(_IsVirgin) -> ?LOG_DEBUG( "DB: initialize Mnesia", #{domain => ?RMQLOG_DOMAIN_DB}), @@ -99,11 +99,11 @@ init_using_mnesia() -> ?assertEqual(rabbit:data_dir(), mnesia_dir()), rabbit_sup:start_child(mnesia_sync). -init_using_khepri() -> +init_using_khepri(IsVirgin) -> ?LOG_DEBUG( "DB: initialize Khepri", #{domain => ?RMQLOG_DOMAIN_DB}), - rabbit_khepri:init(). + rabbit_khepri:init(IsVirgin). init_finished() -> %% Used during initialisation by rabbit_logger_exchange_h.erl diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index d2bbd53a7cff..18bb47cf6de4 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -101,7 +101,7 @@ -export([setup/0, setup/1, register_projections/0, - init/0, + init/1, can_join_cluster/1, add_member/2, remove_member/1, @@ -291,26 +291,38 @@ retry_timeout() -> %% @private --spec init() -> Ret when +-spec init(IsVirgin) -> Ret when + IsVirgin :: boolean(), Ret :: ok | timeout_error(). -init() -> +init(IsVirgin) -> case members() of [] -> timer:sleep(1000), - init(); + init(IsVirgin); Members -> ?LOG_NOTICE( "Found the following metadata store members: ~p", [Members], #{domain => ?RMQLOG_DOMAIN_DB}), - %% Delete transient queues on init. - %% Note that we also do this in the - %% `rabbit_amqqueue:on_node_down/1' callback. We must try this - %% deletion during init because the cluster may have been in a - %% minority when this node went down. We wait for a majority while - %% booting (via `rabbit_khepri:setup/0') though so this deletion is - %% likely to succeed. - rabbit_amqqueue:delete_transient_queues_on_node(node()) + Ret = case IsVirgin of + true -> + register_projections(); + false -> + ok + end, + case Ret of + ok -> + %% Delete transient queues on init. + %% Note that we also do this in the + %% `rabbit_amqqueue:on_node_down/1' callback. We must try + %% this deletion during init because the cluster may have + %% been in a minority when this node went down. We wait for + %% a majority while registering projections above + %% though so this deletion is likely to succeed. + rabbit_amqqueue:delete_transient_queues_on_node(node()); + {error, _} = Error -> + Error + end end. %% @private From 3df81166438ab898541ab34b4910aac8a1a5afe1 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 5 Sep 2024 16:11:13 -0400 Subject: [PATCH 12/15] rabbit_khepri: "fence" during `init/1` `khepri:fence/0,1,2` queries the leader's Raft index and blocks the caller for the given (or default) timeout until the local member has caught up in log replication to that index. We want to do this during Khepri init to ensure that the local Khepri store is reasonably up to date before continuing in the boot process and starting listeners. This is conceptually similar to the call to `mnesia:wait_for_tables/2` during `rabbit_mnesia:init/0` and should have the same effect. --- deps/rabbit/src/rabbit_khepri.erl | 45 ++++++++++++++++++------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 18bb47cf6de4..2c7c3d862c64 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -304,24 +304,30 @@ init(IsVirgin) -> ?LOG_NOTICE( "Found the following metadata store members: ~p", [Members], #{domain => ?RMQLOG_DOMAIN_DB}), - Ret = case IsVirgin of - true -> - register_projections(); - false -> - ok - end, - case Ret of - ok -> - %% Delete transient queues on init. - %% Note that we also do this in the - %% `rabbit_amqqueue:on_node_down/1' callback. We must try - %% this deletion during init because the cluster may have - %% been in a minority when this node went down. We wait for - %% a majority while registering projections above - %% though so this deletion is likely to succeed. - rabbit_amqqueue:delete_transient_queues_on_node(node()); - {error, _} = Error -> - Error + maybe + ?LOG_DEBUG( + "Khepri-based " ?RA_FRIENDLY_NAME " catching up on " + "replication to the Raft cluster leader", [], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= fence(retry_timeout()), + ?LOG_DEBUG( + "local Khepri-based " ?RA_FRIENDLY_NAME " member is caught " + "up to the Raft cluster leader", [], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= case IsVirgin of + true -> + register_projections(); + false -> + ok + end, + %% Delete transient queues on init. + %% Note that we also do this in the + %% `rabbit_amqqueue:on_node_down/1' callback. We must try this + %% deletion during init because the cluster may have been in a + %% minority when this node went down. We wait for a majority + %% while registering projections above though so this deletion + %% is likely to succeed. + rabbit_amqqueue:delete_transient_queues_on_node(node()) end end. @@ -1056,6 +1062,9 @@ info() -> handle_async_ret(RaEvent) -> khepri:handle_async_ret(?STORE_ID, RaEvent). +fence(Timeout) -> + khepri:fence(?STORE_ID, Timeout). + %% ------------------------------------------------------------------- %% collect_payloads(). %% ------------------------------------------------------------------- From 6bf0aac65d4b07082f153cad4e0a34af33c25549 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 10 Sep 2024 10:34:55 -0400 Subject: [PATCH 13/15] rabbit_khepri: Rename legacy projection unregistration function, add docs This function is meant to remove any projections which were mistakenly registered in 3.13.x rather than all existing projections. --- deps/rabbit/src/rabbit_khepri.erl | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 2c7c3d862c64..b79606ea2881 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1107,14 +1107,23 @@ collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) -> Acc end, Acc0, Props). --spec unregister_all_projections() -> Ret when +-spec unregister_legacy_projections() -> Ret when Ret :: ok | timeout_error(). +%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x +%% versions. +%% +%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if +%% Khepri was not enabled. This function is used by the `khepri_db' enable +%% callback to remove those projections before we register the ones necessary +%% for 4.0.x. +%% +%% @private -unregister_all_projections() -> +unregister_legacy_projections() -> %% Note that we don't use `all' since `khepri_mnesia_migration' also %% creates a projection table which we don't want to unregister. Instead - %% we list all of the currently used projection names: - Names = [ + %% we list all of the legacy projection names: + LegacyNames = [ rabbit_khepri_exchange, rabbit_khepri_queue, rabbit_khepri_vhost, @@ -1126,7 +1135,7 @@ unregister_all_projections() -> rabbit_khepri_index_route, rabbit_khepri_topic_trie ], - khepri:unregister_projections(?STORE_ID, Names). + khepri:unregister_projections(?STORE_ID, LegacyNames). register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, @@ -1543,7 +1552,7 @@ get_feature_state(Node) -> khepri_db_migration_enable(#{feature_name := FeatureName}) -> maybe ok ?= sync_cluster_membership_from_mnesia(FeatureName), - ok ?= unregister_all_projections(), + ok ?= unregister_legacy_projections(), ok ?= register_projections(), migrate_mnesia_tables(FeatureName) end. From b992bc0be8ca484134b0f3cd6dd3419293a0c85b Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 10 Sep 2024 10:37:36 -0400 Subject: [PATCH 14/15] Consistently use singular names for Khepri projections Previously about half of the Khepri projection names were pluralized. --- deps/rabbit/src/rabbit_db_binding.erl | 2 +- deps/rabbit/src/rabbit_db_rtparams.erl | 4 ++-- deps/rabbit/src/rabbit_db_user.erl | 4 ++-- deps/rabbit/src/rabbit_khepri.erl | 10 +++++----- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 4b3fd102405e..942b3a648110 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -53,7 +53,7 @@ -define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route). -define(MNESIA_REVERSE_TABLE, rabbit_reverse_route). -define(MNESIA_INDEX_TABLE, rabbit_index_route). --define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_bindings). +-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding). -define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_rtparams.erl b/deps/rabbit/src/rabbit_db_rtparams.erl index d241c72e540e..f57642ee953b 100644 --- a/deps/rabbit/src/rabbit_db_rtparams.erl +++ b/deps/rabbit/src/rabbit_db_rtparams.erl @@ -23,8 +23,8 @@ ]). -define(MNESIA_TABLE, rabbit_runtime_parameters). --define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparams). --define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparams). +-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparam). +-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparam). -define(any(Value), case Value of '_' -> ?KHEPRI_WILDCARD_STAR; _ -> Value diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl index a717e69337b3..af72080be9c1 100644 --- a/deps/rabbit/src/rabbit_db_user.erl +++ b/deps/rabbit/src/rabbit_db_user.erl @@ -75,8 +75,8 @@ -define(MNESIA_TABLE, rabbit_user). -define(PERM_MNESIA_TABLE, rabbit_user_permission). -define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission). --define(KHEPRI_USERS_PROJECTION, rabbit_khepri_users). --define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permissions). +-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_user). +-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permission). %% ------------------------------------------------------------------- %% create(). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index b79606ea2881..a412e80a8e85 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1190,21 +1190,21 @@ register_rabbit_vhost_projection() -> register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_users_projection() -> - Name = rabbit_khepri_users, + Name = rabbit_khepri_user, PathPattern = rabbit_db_user:khepri_user_path( _UserName = ?KHEPRI_WILDCARD_STAR), KeyPos = 2, %% #internal_user.username register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_global_runtime_parameters_projection() -> - Name = rabbit_khepri_global_rtparams, + Name = rabbit_khepri_global_rtparam, PathPattern = rabbit_db_rtparams:khepri_global_rp_path( _Key = ?KHEPRI_WILDCARD_STAR_STAR), KeyPos = #runtime_parameters.key, register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_per_vhost_runtime_parameters_projection() -> - Name = rabbit_khepri_per_vhost_rtparams, + Name = rabbit_khepri_per_vhost_rtparam, PathPattern = rabbit_db_rtparams:khepri_vhost_rp_path( _VHost = ?KHEPRI_WILDCARD_STAR_STAR, _Component = ?KHEPRI_WILDCARD_STAR_STAR, @@ -1213,7 +1213,7 @@ register_rabbit_per_vhost_runtime_parameters_projection() -> register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_user_permissions_projection() -> - Name = rabbit_khepri_user_permissions, + Name = rabbit_khepri_user_permission, PathPattern = rabbit_db_user:khepri_user_permission_path( _UserName = ?KHEPRI_WILDCARD_STAR, _VHost = ?KHEPRI_WILDCARD_STAR), @@ -1232,7 +1232,7 @@ register_rabbit_bindings_projection() -> ProjectionFun = projection_fun_for_sets(MapFun), Options = #{keypos => #route.binding}, Projection = khepri_projection:new( - rabbit_khepri_bindings, ProjectionFun, Options), + rabbit_khepri_binding, ProjectionFun, Options), PathPattern = rabbit_db_binding:khepri_route_path( _VHost = ?KHEPRI_WILDCARD_STAR, _ExchangeName = ?KHEPRI_WILDCARD_STAR, From 2e7f1493b69eced110be4a3cb58d79d170ef976e Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 10 Sep 2024 13:00:25 -0400 Subject: [PATCH 15/15] rabbit_khepri: Add debug logs in khepri_db enable callback Without these there is no indication of unregistering and registering projections. --- deps/rabbit/src/rabbit_khepri.erl | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index a412e80a8e85..d8f35e990fba 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1552,7 +1552,15 @@ get_feature_state(Node) -> khepri_db_migration_enable(#{feature_name := FeatureName}) -> maybe ok ?= sync_cluster_membership_from_mnesia(FeatureName), + ?LOG_INFO( + "Feature flag `~s`: unregistering legacy projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), ok ?= unregister_legacy_projections(), + ?LOG_INFO( + "Feature flag `~s`: registering projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), ok ?= register_projections(), migrate_mnesia_tables(FeatureName) end.