Skip to content

Commit ce72903

Browse files
committed
rabbit_khepri: "fence" during init/1
`khepri:fence/0,1,2` queries the leader's Raft index and blocks the caller for the given (or default) timeout until the local member has caught up in log replication to that index. We want to do this during Khepri init to ensure that the local Khepri store is reasonably up to date before continuing in the boot process and starting listeners. This is conceptually similar to the call to `mnesia:wait_for_tables/2` during `rabbit_mnesia:init/0` and should have the same effect.
1 parent e8d2675 commit ce72903

File tree

1 file changed

+27
-18
lines changed

1 file changed

+27
-18
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -304,24 +304,30 @@ init(IsVirgin) ->
304304
?LOG_NOTICE(
305305
"Found the following metadata store members: ~p", [Members],
306306
#{domain => ?RMQLOG_DOMAIN_DB}),
307-
Ret = case IsVirgin of
308-
true ->
309-
register_projections();
310-
false ->
311-
ok
312-
end,
313-
case Ret of
314-
ok ->
315-
%% Delete transient queues on init.
316-
%% Note that we also do this in the
317-
%% `rabbit_amqqueue:on_node_down/1' callback. We must try
318-
%% this deletion during init because the cluster may have
319-
%% been in a minority when this node went down. We wait for
320-
%% a majority while registering projections above
321-
%% though so this deletion is likely to succeed.
322-
rabbit_amqqueue:delete_transient_queues_on_node(node());
323-
{error, _} = Error ->
324-
Error
307+
maybe
308+
?LOG_DEBUG(
309+
"Khepri-based " ?RA_FRIENDLY_NAME " catching up on "
310+
"replication to the Raft cluster leader", [],
311+
#{domain => ?RMQLOG_DOMAIN_DB}),
312+
ok ?= fence(retry_timeout()),
313+
?LOG_DEBUG(
314+
"local Khepri-based " ?RA_FRIENDLY_NAME " member is caught "
315+
"up to the Raft cluster leader", [],
316+
#{domain => ?RMQLOG_DOMAIN_DB}),
317+
ok ?= case IsVirgin of
318+
true ->
319+
register_projections();
320+
false ->
321+
ok
322+
end,
323+
%% Delete transient queues on init.
324+
%% Note that we also do this in the
325+
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
326+
%% deletion during init because the cluster may have been in a
327+
%% minority when this node went down. We wait for a majority
328+
%% while registering projections above though so this deletion
329+
%% is likely to succeed.
330+
rabbit_amqqueue:delete_transient_queues_on_node(node())
325331
end
326332
end.
327333

@@ -1056,6 +1062,9 @@ info() ->
10561062
handle_async_ret(RaEvent) ->
10571063
khepri:handle_async_ret(?STORE_ID, RaEvent).
10581064

1065+
fence(Timeout) ->
1066+
khepri:fence(?STORE_ID, Timeout).
1067+
10591068
%% -------------------------------------------------------------------
10601069
%% collect_payloads().
10611070
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)