Skip to content

Commit 56f4489

Browse files
committed
rabbit_khepri: Register projections during virgin init/1
This covers a specific case where we need to register projections not covered by the enable callback of the `khepri_db` feature flag. The feature flag may be enabled if a node has been part of a cluster which enabled the flag, but the metadata store might be reset. Upon init the feature flag will be enabled but the store will be empty and the projections will not exist, so operations like inserting default data will fail when asserting that a vhost exists for example. This fixes the `cluster_management_SUITE:forget_cluster_node_in_khepri/1` case when running the suite with `RABBITMQ_METADATA_STORE=khepri`, which fails as mentioned above. We could run projection registration always when using Khepri but once projections are registered the command is idempotent so there's no need to, and the commands are somewhat large.
1 parent b497880 commit 56f4489

File tree

2 files changed

+29
-17
lines changed

2 files changed

+29
-17
lines changed

deps/rabbit/src/rabbit_db.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ init() ->
6767
end,
6868

6969
Ret = case rabbit_khepri:is_enabled() of
70-
true -> init_using_khepri();
71-
false -> init_using_mnesia()
70+
true -> init_using_khepri(IsVirgin);
71+
false -> init_using_mnesia(IsVirgin)
7272
end,
7373
case Ret of
7474
ok ->
@@ -91,19 +91,19 @@ pre_init(IsVirgin) ->
9191
OtherMembers = rabbit_nodes:nodes_excl_me(Members),
9292
rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin).
9393

94-
init_using_mnesia() ->
94+
init_using_mnesia(_IsVirgin) ->
9595
?LOG_DEBUG(
9696
"DB: initialize Mnesia",
9797
#{domain => ?RMQLOG_DOMAIN_DB}),
9898
ok = rabbit_mnesia:init(),
9999
?assertEqual(rabbit:data_dir(), mnesia_dir()),
100100
rabbit_sup:start_child(mnesia_sync).
101101

102-
init_using_khepri() ->
102+
init_using_khepri(IsVirgin) ->
103103
?LOG_DEBUG(
104104
"DB: initialize Khepri",
105105
#{domain => ?RMQLOG_DOMAIN_DB}),
106-
rabbit_khepri:init().
106+
rabbit_khepri:init(IsVirgin).
107107

108108
init_finished() ->
109109
%% Used during initialisation by rabbit_logger_exchange_h.erl

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
-export([setup/0,
100100
setup/1,
101101
register_projections/0,
102-
init/0,
102+
init/1,
103103
can_join_cluster/1,
104104
add_member/2,
105105
remove_member/1,
@@ -288,26 +288,38 @@ retry_timeout() ->
288288

289289
%% @private
290290

291-
-spec init() -> Ret when
291+
-spec init(IsVirgin) -> Ret when
292+
IsVirgin :: boolean(),
292293
Ret :: ok | timeout_error().
293294

294-
init() ->
295+
init(IsVirgin) ->
295296
case members() of
296297
[] ->
297298
timer:sleep(1000),
298-
init();
299+
init(IsVirgin);
299300
Members ->
300301
?LOG_NOTICE(
301302
"Found the following metadata store members: ~p", [Members],
302303
#{domain => ?RMQLOG_DOMAIN_DB}),
303-
%% Delete transient queues on init.
304-
%% Note that we also do this in the
305-
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
306-
%% deletion during init because the cluster may have been in a
307-
%% minority when this node went down. We wait for a majority while
308-
%% booting (via `rabbit_khepri:setup/0') though so this deletion is
309-
%% likely to succeed.
310-
rabbit_amqqueue:delete_transient_queues_on_node(node())
304+
Ret = case IsVirgin of
305+
true ->
306+
register_projections();
307+
false ->
308+
ok
309+
end,
310+
case Ret of
311+
ok ->
312+
%% Delete transient queues on init.
313+
%% Note that we also do this in the
314+
%% `rabbit_amqqueue:on_node_down/1' callback. We must try
315+
%% this deletion during init because the cluster may have
316+
%% been in a minority when this node went down. We wait for
317+
%% a majority while registering projections above
318+
%% though so this deletion is likely to succeed.
319+
rabbit_amqqueue:delete_transient_queues_on_node(node());
320+
{error, _} = Error ->
321+
Error
322+
end
311323
end.
312324

313325
%% @private

0 commit comments

Comments
 (0)