Skip to content

Commit e8d2675

Browse files
committed
rabbit_khepri: Register projections during virgin init/1
This covers a specific case where we need to register projections not covered by the enable callback of the `khepri_db` feature flag. The feature flag may be enabled if a node has been part of a cluster which enabled the flag, but the metadata store might be reset. Upon init the feature flag will be enabled but the store will be empty and the projections will not exist, so operations like inserting default data will fail when asserting that a vhost exists for example. This fixes the `cluster_management_SUITE:forget_cluster_node_in_khepri/1` case when running the suite with `RABBITMQ_METADATA_STORE=khepri`, which fails as mentioned above. We could run projection registration always when using Khepri but once projections are registered the command is idempotent so there's no need to, and the commands are somewhat large.
1 parent 67031e3 commit e8d2675

File tree

2 files changed

+29
-17
lines changed

2 files changed

+29
-17
lines changed

deps/rabbit/src/rabbit_db.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ init() ->
6767
end,
6868

6969
Ret = case rabbit_khepri:is_enabled() of
70-
true -> init_using_khepri();
71-
false -> init_using_mnesia()
70+
true -> init_using_khepri(IsVirgin);
71+
false -> init_using_mnesia(IsVirgin)
7272
end,
7373
case Ret of
7474
ok ->
@@ -91,19 +91,19 @@ pre_init(IsVirgin) ->
9191
OtherMembers = rabbit_nodes:nodes_excl_me(Members),
9292
rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin).
9393

94-
init_using_mnesia() ->
94+
init_using_mnesia(_IsVirgin) ->
9595
?LOG_DEBUG(
9696
"DB: initialize Mnesia",
9797
#{domain => ?RMQLOG_DOMAIN_DB}),
9898
ok = rabbit_mnesia:init(),
9999
?assertEqual(rabbit:data_dir(), mnesia_dir()),
100100
rabbit_sup:start_child(mnesia_sync).
101101

102-
init_using_khepri() ->
102+
init_using_khepri(IsVirgin) ->
103103
?LOG_DEBUG(
104104
"DB: initialize Khepri",
105105
#{domain => ?RMQLOG_DOMAIN_DB}),
106-
rabbit_khepri:init().
106+
rabbit_khepri:init(IsVirgin).
107107

108108
init_finished() ->
109109
%% Used during initialisation by rabbit_logger_exchange_h.erl

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@
101101
-export([setup/0,
102102
setup/1,
103103
register_projections/0,
104-
init/0,
104+
init/1,
105105
can_join_cluster/1,
106106
add_member/2,
107107
remove_member/1,
@@ -291,26 +291,38 @@ retry_timeout() ->
291291

292292
%% @private
293293

294-
-spec init() -> Ret when
294+
-spec init(IsVirgin) -> Ret when
295+
IsVirgin :: boolean(),
295296
Ret :: ok | timeout_error().
296297

297-
init() ->
298+
init(IsVirgin) ->
298299
case members() of
299300
[] ->
300301
timer:sleep(1000),
301-
init();
302+
init(IsVirgin);
302303
Members ->
303304
?LOG_NOTICE(
304305
"Found the following metadata store members: ~p", [Members],
305306
#{domain => ?RMQLOG_DOMAIN_DB}),
306-
%% Delete transient queues on init.
307-
%% Note that we also do this in the
308-
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
309-
%% deletion during init because the cluster may have been in a
310-
%% minority when this node went down. We wait for a majority while
311-
%% booting (via `rabbit_khepri:setup/0') though so this deletion is
312-
%% likely to succeed.
313-
rabbit_amqqueue:delete_transient_queues_on_node(node())
307+
Ret = case IsVirgin of
308+
true ->
309+
register_projections();
310+
false ->
311+
ok
312+
end,
313+
case Ret of
314+
ok ->
315+
%% Delete transient queues on init.
316+
%% Note that we also do this in the
317+
%% `rabbit_amqqueue:on_node_down/1' callback. We must try
318+
%% this deletion during init because the cluster may have
319+
%% been in a minority when this node went down. We wait for
320+
%% a majority while registering projections above
321+
%% though so this deletion is likely to succeed.
322+
rabbit_amqqueue:delete_transient_queues_on_node(node());
323+
{error, _} = Error ->
324+
Error
325+
end
314326
end.
315327

316328
%% @private

0 commit comments

Comments
 (0)