Skip to content

Commit cf9609f

Browse files
ra:start_cluster/3: handle timeouts
Closes #539. References rabbitmq/rabbitmq-server#13828.
1 parent db2869a commit cf9609f

File tree

4 files changed

+63
-42
lines changed

4 files changed

+63
-42
lines changed

src/ra.erl

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -432,8 +432,7 @@ start_cluster(System, ServerConfigs)
432432
{error, cluster_not_formed}.
433433
start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs,
434434
Timeout) when is_atom(System) ->
435-
{Started, NotStarted} =
436-
ra_lib:partition_parallel(
435+
case ra_lib:partition_parallel(
437436
fun (C) ->
438437
case start_server(System, C) of
439438
ok -> true;
@@ -442,39 +441,46 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs,
442441
[C, Err]),
443442
false
444443
end
445-
end, ServerConfigs),
446-
case Started of
447-
[] ->
448-
?ERR("ra: failed to form a new cluster ~w. "
449-
"No servers were successfully started.",
450-
[ClusterName]),
451-
{error, cluster_not_formed};
452-
_ ->
444+
end, ServerConfigs) of
445+
{ok, Started, NotStarted} ->
446+
case Started of
447+
[] ->
448+
?ERR("ra: failed to form a new cluster ~w. "
449+
"No servers were successfully started.",
450+
[ClusterName]),
451+
{error, cluster_not_formed};
452+
_ ->
453+
StartedIds = sort_by_local([I || #{id := I} <- Started], []),
454+
NotStartedIds = [I || #{id := I} <- NotStarted],
455+
%% try triggering elections until one succeeds
456+
%% TODO: handle case where no election was successfully triggered
457+
{value, TriggeredId} = lists:search(fun (N) ->
458+
ok == trigger_election(N)
459+
end, StartedIds),
460+
%% the triggered id is likely to become the leader so try that first
461+
case members(TriggeredId,
462+
length(ServerConfigs) * Timeout) of
463+
{ok, _, Leader} ->
464+
?INFO("ra: started cluster ~ts with ~b servers. "
465+
"~b servers failed to start: ~w. Leader: ~w",
466+
[ClusterName, length(ServerConfigs),
467+
length(NotStarted), NotStartedIds,
468+
Leader]),
469+
% we have a functioning cluster
470+
{ok, StartedIds, NotStartedIds};
471+
Err ->
472+
?WARN("ra: failed to form new cluster ~w. "
473+
"Error: ~w", [ClusterName, Err]),
474+
_ = [force_delete_server(System, N) || N <- StartedIds],
475+
% we do not have a functioning cluster
476+
{error, cluster_not_formed}
477+
end
478+
end;
479+
{error, {partition_parallel_timeout, Started, _}} ->
453480
StartedIds = sort_by_local([I || #{id := I} <- Started], []),
454-
NotStartedIds = [I || #{id := I} <- NotStarted],
455-
%% try triggering elections until one succeeds
456-
%% TODO: handle case where no election was successfully triggered
457-
{value, TriggeredId} = lists:search(fun (N) ->
458-
ok == trigger_election(N)
459-
end, StartedIds),
460-
%% the triggered id is likely to become the leader so try that first
461-
case members(TriggeredId,
462-
length(ServerConfigs) * Timeout) of
463-
{ok, _, Leader} ->
464-
?INFO("ra: started cluster ~ts with ~b servers. "
465-
"~b servers failed to start: ~w. Leader: ~w",
466-
[ClusterName, length(ServerConfigs),
467-
length(NotStarted), NotStartedIds,
468-
Leader]),
469-
% we have a functioning cluster
470-
{ok, StartedIds, NotStartedIds};
471-
Err ->
472-
?WARN("ra: failed to form new cluster ~w. "
473-
"Error: ~w", [ClusterName, Err]),
474-
_ = [force_delete_server(System, N) || N <- StartedIds],
475-
% we do not have a functioning cluster
476-
{error, cluster_not_formed}
477-
end
481+
?WARN("ra: a member of cluster ~w failed to start within the expected time interval (~w)", [ClusterName, Timeout]),
482+
_ = [force_delete_server(System, N) || N <- StartedIds],
483+
{error, cluster_not_formed}
478484
end.
479485

480486
%% @doc Starts an individual ra server of a cluster.

src/ra_lib.erl

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,16 +298,23 @@ derive_safe_string(S, Num) ->
298298
end,
299299
string:slice(F(string:next_grapheme(S), []), 0, Num).
300300

301+
-spec partition_parallel(fun((any()) -> boolean()), [any()]) ->
302+
{ok, [any()], [any()]} | {error, any()}.
301303
partition_parallel(F, Es) ->
302304
partition_parallel(F, Es, 60000).
303305

306+
-spec partition_parallel(fun((any()) -> boolean()), [any()], timeout()) ->
307+
{ok, [any()], [any()]} | {error, any()}.
304308
partition_parallel(F, Es, Timeout) ->
305309
Parent = self(),
306310
Running = [{spawn_monitor(fun() ->
307311
Parent ! {self(), F(E)}
308312
end), E}
309313
|| E <- Es],
310-
collect(Running, {[], []}, Timeout).
314+
case collect(Running, {[], []}, Timeout) of
315+
{error, _} = E -> E;
316+
{Successes, Failures} -> {ok, Successes, Failures}
317+
end.
311318

312319
collect([], Acc, _Timeout) ->
313320
Acc;
@@ -322,7 +329,7 @@ collect([{{Pid, MRef}, E} | Next], {Left, Right}, Timeout) ->
322329
{'DOWN', MRef, process, Pid, Reason} ->
323330
collect(Next, {Left, [{E, Reason} | Right]}, Timeout)
324331
after Timeout ->
325-
exit(partition_parallel_timeout)
332+
{error, {partition_parallel_timeout, Left, Right}}
326333
end.
327334

328335
retry(Func, Attempts) ->
@@ -579,4 +586,11 @@ lists_detect_sort_test() ->
579586

580587
ok.
581588

589+
partition_parallel_test() ->
590+
?assertMatch({error, {partition_parallel_timeout, [], []}},
591+
partition_parallel(fun(_) ->
592+
timer:sleep(infinity)
593+
end, [1, 2, 3], 1000)),
594+
ok.
595+
582596
-endif.

src/ra_log_segment_writer.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,12 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
160160
end, [], Ranges),
161161

162162
_ = [begin
163-
{_, Failures} = ra_lib:partition_parallel(
164-
fun (TidRange) ->
165-
ok = flush_mem_table_ranges(TidRange, State),
166-
true
167-
end, Tabs, infinity),
163+
{ok, _, Failures} =
164+
ra_lib:partition_parallel(
165+
fun (TidRange) ->
166+
ok = flush_mem_table_ranges(TidRange, State),
167+
true
168+
end, Tabs, infinity),
168169
case Failures of
169170
[] ->
170171
%% this is what we expect

test/coordination_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ shrink_cluster_with_snapshot(Config) ->
390390
ClusterName = ?config(cluster_name, Config),
391391
Peers = start_peers([s1,s2,s3], PrivDir),
392392
ServerIds = server_ids(ClusterName, Peers),
393-
[A, B, C] = ServerIds,
393+
[_A, _B, _C] = ServerIds,
394394

395395
Machine = {module, ?MODULE, #{}},
396396
{ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds),

0 commit comments

Comments
 (0)