diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index faa4dd28e6b3..6dd2ae7d01cf 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -67,8 +67,8 @@ init() -> end, Ret = case rabbit_khepri:is_enabled() of - true -> init_using_khepri(); - false -> init_using_mnesia() + true -> init_using_khepri(IsVirgin); + false -> init_using_mnesia(IsVirgin) end, case Ret of ok -> @@ -91,7 +91,7 @@ pre_init(IsVirgin) -> OtherMembers = rabbit_nodes:nodes_excl_me(Members), rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin). -init_using_mnesia() -> +init_using_mnesia(_IsVirgin) -> ?LOG_DEBUG( "DB: initialize Mnesia", #{domain => ?RMQLOG_DOMAIN_DB}), @@ -99,11 +99,11 @@ init_using_mnesia() -> ?assertEqual(rabbit:data_dir(), mnesia_dir()), rabbit_sup:start_child(mnesia_sync). -init_using_khepri() -> +init_using_khepri(IsVirgin) -> ?LOG_DEBUG( "DB: initialize Khepri", #{domain => ?RMQLOG_DOMAIN_DB}), - rabbit_khepri:init(). + rabbit_khepri:init(IsVirgin). init_finished() -> %% Used during initialisation by rabbit_logger_exchange_h.erl diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 913b4de80d5f..2c7c3d862c64 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -87,6 +87,8 @@ -module(rabbit_khepri). +-feature(maybe_expr, enable). + -include_lib("kernel/include/logger.hrl"). -include_lib("stdlib/include/assert.hrl"). @@ -98,7 +100,8 @@ -export([setup/0, setup/1, - init/0, + register_projections/0, + init/1, can_join_cluster/1, add_member/2, remove_member/1, @@ -267,7 +270,6 @@ setup(_) -> RetryTimeout = retry_timeout(), case khepri_cluster:wait_for_leader(?STORE_ID, RetryTimeout) of ok -> - wait_for_register_projections(), ?LOG_DEBUG( "Khepri-based " ?RA_FRIENDLY_NAME " ready", #{domain => ?RMQLOG_DOMAIN_GLOBAL}), @@ -287,49 +289,46 @@ retry_timeout() -> undefined -> 30000 end. -retry_limit() -> - case application:get_env(rabbit, khepri_leader_wait_retry_limit) of - {ok, T} -> T; - undefined -> 10 - end. - -wait_for_register_projections() -> - wait_for_register_projections(retry_timeout(), retry_limit()). - -wait_for_register_projections(_Timeout, 0) -> - exit(timeout_waiting_for_khepri_projections); -wait_for_register_projections(Timeout, Retries) -> - rabbit_log:info("Waiting for Khepri projections for ~tp ms, ~tp retries left", - [Timeout, Retries - 1]), - try - register_projections() - catch - throw : timeout -> - wait_for_register_projections(Timeout, Retries -1) - end. - %% @private --spec init() -> Ret when +-spec init(IsVirgin) -> Ret when + IsVirgin :: boolean(), Ret :: ok | timeout_error(). -init() -> +init(IsVirgin) -> case members() of [] -> timer:sleep(1000), - init(); + init(IsVirgin); Members -> ?LOG_NOTICE( "Found the following metadata store members: ~p", [Members], #{domain => ?RMQLOG_DOMAIN_DB}), - %% Delete transient queues on init. - %% Note that we also do this in the - %% `rabbit_amqqueue:on_node_down/1' callback. We must try this - %% deletion during init because the cluster may have been in a - %% minority when this node went down. We wait for a majority while - %% booting (via `rabbit_khepri:setup/0') though so this deletion is - %% likely to succeed. - rabbit_amqqueue:delete_transient_queues_on_node(node()) + maybe + ?LOG_DEBUG( + "Khepri-based " ?RA_FRIENDLY_NAME " catching up on " + "replication to the Raft cluster leader", [], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= fence(retry_timeout()), + ?LOG_DEBUG( + "local Khepri-based " ?RA_FRIENDLY_NAME " member is caught " + "up to the Raft cluster leader", [], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= case IsVirgin of + true -> + register_projections(); + false -> + ok + end, + %% Delete transient queues on init. + %% Note that we also do this in the + %% `rabbit_amqqueue:on_node_down/1' callback. We must try this + %% deletion during init because the cluster may have been in a + %% minority when this node went down. We wait for a majority + %% while registering projections above though so this deletion + %% is likely to succeed. + rabbit_amqqueue:delete_transient_queues_on_node(node()) + end end. %% @private @@ -1063,6 +1062,9 @@ info() -> handle_async_ret(RaEvent) -> khepri:handle_async_ret(?STORE_ID, RaEvent). +fence(Timeout) -> + khepri:fence(?STORE_ID, Timeout). + %% ------------------------------------------------------------------- %% collect_payloads(). %% ------------------------------------------------------------------- @@ -1105,6 +1107,27 @@ collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) -> Acc end, Acc0, Props). +-spec unregister_all_projections() -> Ret when + Ret :: ok | timeout_error(). + +unregister_all_projections() -> + %% Note that we don't use `all' since `khepri_mnesia_migration' also + %% creates a projection table which we don't want to unregister. Instead + %% we list all of the currently used projection names: + Names = [ + rabbit_khepri_exchange, + rabbit_khepri_queue, + rabbit_khepri_vhost, + rabbit_khepri_users, + rabbit_khepri_global_rtparams, + rabbit_khepri_per_vhost_rtparams, + rabbit_khepri_user_permissions, + rabbit_khepri_bindings, + rabbit_khepri_index_route, + rabbit_khepri_topic_trie + ], + khepri:unregister_projections(?STORE_ID, Names). + register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, fun register_rabbit_queue_projection/0, @@ -1116,20 +1139,23 @@ register_projections() -> fun register_rabbit_bindings_projection/0, fun register_rabbit_index_route_projection/0, fun register_rabbit_topic_graph_projection/0], - [case RegisterFun() of - ok -> - ok; - %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` would - %% return `{error, exists}` for projections which already exist. - {error, exists} -> - ok; - %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. - {error, {khepri, projection_already_exists, _Info}} -> - ok; - {error, Error} -> - throw(Error) - end || RegisterFun <- RegFuns], - ok. + rabbit_misc:for_each_while_ok( + fun(RegisterFun) -> + case RegisterFun() of + ok -> + ok; + %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` + %% would return `{error, exists}` for projections which + %% already exist. + {error, exists} -> + ok; + %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. + {error, {khepri, projection_already_exists, _Info}} -> + ok; + {error, _} = Error -> + Error + end + end, RegFuns). register_rabbit_exchange_projection() -> Name = rabbit_khepri_exchange, @@ -1188,7 +1214,7 @@ register_rabbit_user_permissions_projection() -> register_simple_projection(Name, PathPattern, KeyPos) -> Options = #{keypos => KeyPos}, Projection = khepri_projection:new(Name, copy, Options), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_bindings_projection() -> MapFun = fun(_Path, Binding) -> @@ -1204,7 +1230,7 @@ register_rabbit_bindings_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_index_route_projection() -> MapFun = fun(Path, _) -> @@ -1236,7 +1262,7 @@ register_rabbit_index_route_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). %% Routing information is stored in the Khepri store as a `set'. %% In order to turn these bindings into records in an ETS `bag', we use a @@ -1337,7 +1363,7 @@ register_rabbit_topic_graph_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). -spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when Table :: ets:tid(), @@ -1515,9 +1541,11 @@ get_feature_state(Node) -> %% @private khepri_db_migration_enable(#{feature_name := FeatureName}) -> - case sync_cluster_membership_from_mnesia(FeatureName) of - ok -> migrate_mnesia_tables(FeatureName); - Error -> Error + maybe + ok ?= sync_cluster_membership_from_mnesia(FeatureName), + ok ?= unregister_all_projections(), + ok ?= register_projections(), + migrate_mnesia_tables(FeatureName) end. %% @private diff --git a/deps/rabbit/test/metadata_store_phase1_SUITE.erl b/deps/rabbit/test/metadata_store_phase1_SUITE.erl index 7e50445820f0..cf080d170ce1 100644 --- a/deps/rabbit/test/metadata_store_phase1_SUITE.erl +++ b/deps/rabbit/test/metadata_store_phase1_SUITE.erl @@ -192,6 +192,7 @@ setup_khepri(Config) -> %% Configure Khepri. It takes care of configuring Ra system & cluster. It %% uses the Mnesia directory to store files. ok = rabbit_khepri:setup(undefined), + ok = rabbit_khepri:register_projections(), ct:pal("Khepri info below:"), rabbit_khepri:info(), diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index da48bbcca895..0c362f872573 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -981,7 +981,7 @@ enable_khepri_metadata_store(Config, FFs0) -> case enable_feature_flag(C, FF) of ok -> C; - Skip -> + {skip, _} = Skip -> ct:pal("Enabling metadata store failed: ~p", [Skip]), Skip end