Skip to content

Commit 6870cf1

Browse files
committed
rabbit_khepri: "fence" during init/1
`khepri:fence/0,1,2` queries the leader's Raft index and blocks the caller for the given (or default) timeout until the local member has caught up in log replication to that index. We want to do this during Khepri init to ensure that the local Khepri store is reasonably up to date before continuing in the boot process and starting listeners. This is conceptually similar to the call to `mnesia:wait_for_tables/2` during `rabbit_mnesia:init/0` and should have the same effect.
1 parent 56f4489 commit 6870cf1

File tree

1 file changed

+27
-18
lines changed

1 file changed

+27
-18
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -301,24 +301,30 @@ init(IsVirgin) ->
301301
?LOG_NOTICE(
302302
"Found the following metadata store members: ~p", [Members],
303303
#{domain => ?RMQLOG_DOMAIN_DB}),
304-
Ret = case IsVirgin of
305-
true ->
306-
register_projections();
307-
false ->
308-
ok
309-
end,
310-
case Ret of
311-
ok ->
312-
%% Delete transient queues on init.
313-
%% Note that we also do this in the
314-
%% `rabbit_amqqueue:on_node_down/1' callback. We must try
315-
%% this deletion during init because the cluster may have
316-
%% been in a minority when this node went down. We wait for
317-
%% a majority while registering projections above
318-
%% though so this deletion is likely to succeed.
319-
rabbit_amqqueue:delete_transient_queues_on_node(node());
320-
{error, _} = Error ->
321-
Error
304+
maybe
305+
?LOG_DEBUG(
306+
"Khepri-based " ?RA_FRIENDLY_NAME " catching up on "
307+
"replication to the Raft cluster leader", [],
308+
#{domain => ?RMQLOG_DOMAIN_DB}),
309+
ok ?= fence(retry_timeout()),
310+
?LOG_DEBUG(
311+
"local Khepri-based " ?RA_FRIENDLY_NAME " member is caught "
312+
"up to the Raft cluster leader", [],
313+
#{domain => ?RMQLOG_DOMAIN_DB}),
314+
ok ?= case IsVirgin of
315+
true ->
316+
register_projections();
317+
false ->
318+
ok
319+
end,
320+
%% Delete transient queues on init.
321+
%% Note that we also do this in the
322+
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
323+
%% deletion during init because the cluster may have been in a
324+
%% minority when this node went down. We wait for a majority
325+
%% while registering projections above though so this deletion
326+
%% is likely to succeed.
327+
rabbit_amqqueue:delete_transient_queues_on_node(node())
322328
end
323329
end.
324330

@@ -1044,6 +1050,9 @@ info() ->
10441050
handle_async_ret(RaEvent) ->
10451051
khepri:handle_async_ret(?STORE_ID, RaEvent).
10461052

1053+
fence(Timeout) ->
1054+
khepri:fence(?STORE_ID, Timeout).
1055+
10471056
%% -------------------------------------------------------------------
10481057
%% collect_payloads().
10491058
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)