Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 80 additions & 13 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
normalize/1,
append_node_prefix/1,
node_prefix/0]).
-export([do_query_node_props/2]).
-export([do_query_node_props/2,
get_local_start_time/0]).

-ifdef(TEST).
-export([query_node_props/1,
Expand Down Expand Up @@ -614,7 +615,7 @@ query_node_props2([{Node, Members} | Rest], NodesAndProps, FromNode) ->
"queries properties from node '~ts'",
[node(), Node]], FromNode),
StartTime = get_node_start_time(
Node, microsecond, FromNode),
Node, FromNode),
IsReady = is_node_db_ready(Node, FromNode),
NodeAndProps = {Node, Members, StartTime, IsReady},
NodesAndProps1 = [NodeAndProps | NodesAndProps],
Expand Down Expand Up @@ -642,9 +643,8 @@ query_node_props2([], NodesAndProps, _FromNode) ->
?assert(length(NodesAndProps1) =< length(nodes(hidden))),
NodesAndProps1.

-spec get_node_start_time(Node, Unit, FromNode) -> StartTime when
-spec get_node_start_time(Node, FromNode) -> StartTime when
Node :: node(),
Unit :: erlang:time_unit(),
FromNode :: node(),
StartTime :: non_neg_integer().
%% @doc Returns the start time of the given `Node' in `Unit'.
Expand All @@ -663,17 +663,84 @@ query_node_props2([], NodesAndProps, _FromNode) ->
%% https://www.erlang.org/doc/man/erlang#time_offset-0 to get the full
%% explanation of the computation.
%%
%% We also cache the computed start time. The reason is that since Erlang 26,
%% the time offset its volatile by default. Therefore we may compute a
%% different value each time. Caching the computed value gives up stability
%% again.
%%
%% @private

get_node_start_time(Node, Unit, FromNode) ->
NativeStartTime = erpc_call(
Node, erlang, system_info, [start_time], FromNode),
TimeOffset = erpc_call(Node, erlang, time_offset, [], FromNode),
SystemStartTime = NativeStartTime + TimeOffset,
StartTime = erpc_call(
Node, erlang, convert_time_unit,
[SystemStartTime, native, Unit], FromNode),
StartTime.
-define(START_TIME_KEY, rabbitmq_node_start_time).
-define(START_TIME_UNIT, microsecond).

get_node_start_time(Node, FromNode) ->
try
erpc_call(Node, ?MODULE, get_local_start_time, [], FromNode)
catch
error:{exception, undef, [{?MODULE, _, _, _} | _]} ->
get_node_start_time__v1(Node, FromNode)
end.

get_node_start_time__v1(Node, FromNode) ->
LockId = {?START_TIME_KEY, self()},
true = global:set_lock(LockId, [Node]),
try
CachedStartTime = erpc_call(
Node, persistent_term, get,
[?START_TIME_KEY, undefined], FromNode),
case CachedStartTime of
undefined ->
NativeStartTime = erpc_call(
Node, erlang, system_info,
[start_time], FromNode),
TimeOffset = erpc_call(
Node, erlang, time_offset, [], FromNode),
SystemStartTime = NativeStartTime + TimeOffset,
StartTime = erpc_call(
Node, erlang, convert_time_unit,
[SystemStartTime, native, ?START_TIME_UNIT],
FromNode),
erpc_call(
Node, persistent_term, put, [?START_TIME_KEY, StartTime],
FromNode),
StartTime;
_ ->
CachedStartTime
end
after
global:del_lock(LockId, [Node])
end.

%% The function below is exactly the same code as above but with meant to run
%% locally. This allows this node to compute its start time and cache it in
%% advance.

get_local_start_time() ->
Node = node(),
LockId = {?START_TIME_KEY, self()},
true = global:set_lock(LockId, [Node]),
try
CachedStartTime = persistent_term:get(?START_TIME_KEY, undefined),
case CachedStartTime of
undefined ->
NativeStartTime = erlang:system_info(start_time),
TimeOffset = erlang:time_offset(),
SystemStartTime = NativeStartTime + TimeOffset,
StartTime = erlang:convert_time_unit(
SystemStartTime, native, ?START_TIME_UNIT),

?LOG_DEBUG(
"Peer discovery: cache node start time (~b)",
[StartTime],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
persistent_term:put(?START_TIME_KEY, StartTime),
StartTime;
_ ->
CachedStartTime
end
after
global:del_lock(LockId, [Node])
end.

-spec is_node_db_ready(Node, FromNode) -> IsReady when
Node :: node(),
Expand Down
Loading