Skip to content

Commit 3923612

Browse files
committed
Enable async_dist when processing send_msg effects
for remote nodes. This will allow users to simplify their Ra client implementations not to have to handle lost messages as could occur when the distribution buffer fills.
1 parent cc54084 commit 3923612

File tree

2 files changed

+23
-16
lines changed

2 files changed

+23
-16
lines changed

src/ra_server_proc.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,19 @@
7272
-define(HANDLE_EFFECTS(Effects, EvtType, State0),
7373
handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)).
7474

75+
-define(ASYNC_DIST(Node, Send),
76+
case Node == node() of
77+
true ->
78+
Send,
79+
ok;
80+
false ->
81+
%% use async_dist for remote sends
82+
process_flag(async_dist, true),
83+
Send,
84+
process_flag(async_dist, false),
85+
ok
86+
end).
87+
7588
-type query_fun() :: ra:query_fun().
7689
-type query_options() :: #{condition => ra:query_condition()}.
7790

@@ -1349,8 +1362,8 @@ handle_effect(_, {next_event, _, _} = Next, _, State, Actions) ->
13491362
{State, [Next | Actions]};
13501363
handle_effect(leader, {send_msg, To, Msg}, _, State, Actions) ->
13511364
%% default is to send without any wrapping
1352-
%% TODO: handle send failure? how?
1353-
_ = send(To, Msg, State#state.conf),
1365+
ToNode = get_node(To),
1366+
?ASYNC_DIST(ToNode, _ = send(To, Msg, State#state.conf)),
13541367
{State, Actions};
13551368
handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
13561369
_, State, Actions) ->
@@ -1359,13 +1372,13 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
13591372
true ->
13601373
case can_execute_locally(RaftState, ToNode, State) of
13611374
true ->
1362-
send_msg(Eff, State);
1375+
?ASYNC_DIST(ToNode, send_msg(Eff, State));
13631376
false ->
13641377
ok
13651378
end;
13661379
false when RaftState == leader ->
13671380
%% the effect got here so we can execute
1368-
send_msg(Eff, State);
1381+
?ASYNC_DIST(ToNode, send_msg(Eff, State));
13691382
false ->
13701383
ok
13711384
end,
@@ -1978,20 +1991,17 @@ handle_tick_metrics(State) ->
19781991

19791992
can_execute_locally(RaftState, TargetNode,
19801993
#state{server_state = ServerState} = State) ->
1981-
Membership = ra_server:get_membership(ServerState),
19821994
case RaftState of
1983-
_ when RaftState =/= leader andalso
1984-
Membership == voter ->
1985-
TargetNode == node();
19861995
leader when TargetNode =/= node() ->
19871996
%% We need to evaluate whether to send the message.
19881997
%% Only send if there isn't a local node for the target pid.
19891998
Members = do_state_query(voters, State),
19901999
not lists:any(fun ({_, N}) -> N == TargetNode end, Members);
19912000
leader ->
19922001
true;
1993-
_ ->
1994-
false
2002+
_ when RaftState =/= leader ->
2003+
TargetNode == node() andalso
2004+
voter == ra_server:get_membership(ServerState)
19952005
end.
19962006

19972007
can_execute_on_member(_RaftState, Member,

test/ra_SUITE.erl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,16 @@ end_per_testcase(_TestCase, Config) ->
128128
Config.
129129

130130
single_server_processes_command(Config) ->
131-
% ok = logger:set_primary_config(level, all),
132131
Name = ?config(test_name, Config),
133-
N1 = nth_server_name(Config, 1),
132+
{_RaName, _} = N1 = nth_server_name(Config, 1),
134133
ok = ra:start_server(default, Name, N1, add_machine(), []),
135134
ok = ra:trigger_election(N1),
136135
monitor(process, element(1, N1)),
137136
% index is 2 as leaders commit a no-op entry on becoming leaders
138-
% debugger:start(),
139-
% int:i(ra_server_proc),
140-
% int:break(ra_server_proc, 440),
141137
{ok, 5, _} = ra:process_command(N1, 5, 2000),
142138
{ok, 10, _} = ra:process_command(N1, 5, 2000),
143-
terminate_cluster([N1]).
139+
terminate_cluster([N1]),
140+
ok.
144141

145142
pipeline_commands(Config) ->
146143
Name = ?config(test_name, Config),

0 commit comments

Comments
 (0)