Skip to content

Commit 1656c61

Browse files
committed
Khepri: Filter running nodes when selecting a node to cluster with
[Why] So far, the code that selected the node to use as the "entry point" to add the local node to a remote cluster assumed that all cluster members were running and it picked the first node in the cluster members list. If that node was stopped, the join would fail immediately, even if the rest of the members were running fine. [How] Now the function filters out nodes that are unavailable or don't run the expected Khepri store. Then it uses the resulting list as before. The code returns an error if all nodes are stopped or unreachable.
1 parent afa28cb commit 1656c61

File tree

1 file changed

+35
-13
lines changed

1 file changed

+35
-13
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -373,20 +373,42 @@ add_member(JoiningNode, JoinedNode) when is_atom(JoinedNode) ->
373373
JoiningNode, rabbit_khepri, do_join, [JoinedNode]),
374374
post_add_member(JoiningNode, JoinedNode, Ret);
375375
add_member(JoiningNode, [_ | _] = Cluster) ->
376-
JoinedNode = pick_node_in_cluster(Cluster),
377-
?LOG_INFO(
378-
"Khepri clustering: Attempt to add node ~p to cluster ~0p "
379-
"through node ~p",
380-
[JoiningNode, Cluster, JoinedNode],
381-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
382-
%% Recurse with a single node taken in the `Cluster' list.
383-
add_member(JoiningNode, JoinedNode).
376+
case pick_node_in_cluster(Cluster) of
377+
{ok, JoinedNode} ->
378+
?LOG_INFO(
379+
"Khepri clustering: Attempt to add node ~p to cluster ~0p "
380+
"through node ~p",
381+
[JoiningNode, Cluster, JoinedNode],
382+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
383+
%% Recurse with a single node taken in the `Cluster' list.
384+
add_member(JoiningNode, JoinedNode);
385+
{error, _} = Error ->
386+
Error
387+
end.
384388

385-
pick_node_in_cluster([_ | _] = Cluster) when is_list(Cluster) ->
386-
ThisNode = node(),
387-
case lists:member(ThisNode, Cluster) of
388-
true -> ThisNode;
389-
false -> hd(Cluster)
389+
pick_node_in_cluster([_ | _] = Cluster) ->
390+
RunningNodes = lists:filter(
391+
fun(Node) ->
392+
try
393+
erpc:call(
394+
Node,
395+
khepri_cluster, is_store_running,
396+
[?STORE_ID])
397+
catch
398+
_:_ ->
399+
false
400+
end
401+
end, Cluster),
402+
case RunningNodes of
403+
[_ | _] ->
404+
ThisNode = node(),
405+
SelectedNode = case lists:member(ThisNode, RunningNodes) of
406+
true -> ThisNode;
407+
false -> hd(RunningNodes)
408+
end,
409+
{ok, SelectedNode};
410+
[] ->
411+
{error, {no_nodes_to_cluster_with, Cluster}}
390412
end.
391413

392414
do_join(RemoteNode) when RemoteNode =/= node() ->

0 commit comments

Comments
 (0)