Skip to content

Commit 52ec2bc

Browse files
committed
Refactor snapshot receive to consolidate temporary state
- Remove snapshot_next_event field, store deferred event in snapshot_phase as {awaiting_pending, EventType, Rpc} - Add ra_log:has_pending/1 for O(1) pending check - Simplify abort_receive to reset log based on snapshot_has_live_indexes rather than checking individual phases
1 parent 088bc1a commit 52ec2bc

File tree

5 files changed

+70
-25
lines changed

5 files changed

+70
-25
lines changed

src/ra_log.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
can_write/1,
4747
exists/2,
48+
has_pending/1,
4849
overview/1,
4950
%% config
5051
write_config/2,
@@ -1386,6 +1387,12 @@ exists({Idx, Term}, Log0) ->
13861387
end.
13871388

13881389
-spec overview(state()) -> overview().
1390+
-spec has_pending(state()) -> boolean().
1391+
has_pending(#?MODULE{pending = []}) ->
1392+
false;
1393+
has_pending(#?MODULE{}) ->
1394+
true.
1395+
13891396
overview(#?MODULE{range = Range,
13901397
last_term = LastTerm,
13911398
last_written_index_term = LWIT,

src/ra_log_segments.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,8 @@ segment_read_plan(#cfg{log_id = LogId} = Cfg,
520520
undefined ->
521521
%% not found, not good
522522
?WARN("~ts: read plan request did not found all requested indexes"
523-
" missing ~w segrefs ~p",
524-
[LogId, Indexes, ra_lol:to_list(SegRefs)]),
523+
" missing ~w segrefs left ~p",
524+
[LogId, Indexes, SegRefs]),
525525
lists:reverse(Acc)
526526
end.
527527

src/ra_server.erl

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,17 @@
9898
consistent_query_ref()}),
9999
pending_consistent_queries := [consistent_query_ref()],
100100
commit_latency => option(non_neg_integer()),
101-
snapshot_phase => chunk_flag(),
101+
%% snapshot_phase tracks the current phase of snapshot reception:
102+
%% - chunk_flag() for normal phases (init, pre, next, last)
103+
%% - {awaiting_pending, EventType, Rpc} when waiting for pending WAL
104+
%% writes to complete before finalizing snapshot installation
105+
snapshot_phase => chunk_flag() |
106+
{awaiting_pending, term(), #install_snapshot_rpc{}},
102107
pending_release_cursor => {ra_index(), term(),
103108
[ra_machine:release_cursor_condition()]},
104-
%% TODO: review these 3, specific to handle_receive_snapshot
105-
%% could temporarily increase state size over small map (32)
109+
%% temporary state for handle_receive_snapshot
106110
current_event_type => term(),
107-
snapshot_has_live_indexes => boolean(),
108-
snapshot_next_event => term()
111+
snapshot_has_live_indexes => boolean()
109112
}.
110113

111114
-type state() :: ra_server_state().
@@ -1621,7 +1624,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16211624
%% works as an assertion also
16221625
{AcceptingSnapIdx, _} = ra_snapshot:accepting(SnapState0),
16231626
SnapshotHasLiveIndexes = maps:get(snapshot_has_live_indexes, State0, false),
1624-
LogHasPendingIndexes = maps:get(num_pending, ra_log:overview(Log00)) > 0,
1627+
LogHasPendingIndexes = ra_log:has_pending(Log00),
16251628
case ChunkFlag of
16261629
init when SnapPhase == init andalso
16271630
SnapIndex == AcceptingSnapIdx ->
@@ -1673,16 +1676,14 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16731676
when SnapshotHasLiveIndexes andalso
16741677
LogHasPendingIndexes ->
16751678
%% we cannot yet complete the snapshot as there are pending
1676-
%% log indexes
1679+
%% log indexes, defer completion until they are written
16771680
EventType = maps:get(current_event_type, State0),
16781681
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b "
16791682
"cannot yet complete as log has pending indexes",
16801683
[LogId, Num, ChunkFlag, SnapIndex, SnapTerm]),
16811684
{receive_snapshot,
1682-
State0#{snapshot_next_event => {EventType, Rpc}}, []};
1685+
State0#{snapshot_phase => {awaiting_pending, EventType, Rpc}}, []};
16831686
last ->
1684-
%% TODO: we can't do this bit until all pending log entries have
1685-
%% been processed
16861687
?assert(SnapIndex == AcceptingSnapIdx),
16871688
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b",
16881689
[LogId, Num, ChunkFlag, SnapIndex, SnapTerm]),
@@ -1737,7 +1738,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
17371738
machine_state => MacState}),
17381739
State = maps:without([snapshot_phase,
17391740
snapshot_has_live_indexes,
1740-
snapshot_next_event], State1),
1741+
current_event_type], State1),
17411742
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapIndex),
17421743
%% it was the last snapshot chunk so we can revert back to
17431744
%% follower status
@@ -1757,15 +1758,27 @@ handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg,
17571758
State = abort_receive(State0),
17581759
{follower, update_term(Term, State), [{next_event, Msg}]};
17591760
handle_receive_snapshot({ra_log_event, Evt},
1760-
#{cfg := #cfg{log_id = _LogId},
1761+
#{cfg := #cfg{log_id = LogId},
1762+
snapshot_phase := Phase,
17611763
log := Log0} = State0) ->
1762-
% simply forward all other events to ra_log
1763-
% whilst the snapshot is being received
1764+
%% forward log events to ra_log whilst the snapshot is being received
17641765
{Log, Effects} = ra_log:handle_event(Evt, Log0),
1765-
case maps:take(snapshot_next_event, State0) of
1766-
{{EventType, NextEvt}, State} ->
1767-
{receive_snapshot, State#{log => Log},
1768-
[{next_event, EventType, NextEvt} | Effects]};
1766+
case Phase of
1767+
{awaiting_pending, EventType, DeferredRpc} ->
1768+
1769+
case ra_log:has_pending(Log) of
1770+
false ->
1771+
?DEBUG("~ts: pending indexes cleared, completing snapshot",
1772+
[LogId]),
1773+
%% replay the deferred last chunk, set phase to next so
1774+
%% the replayed event doesn't hit the awaiting_pending guard
1775+
{receive_snapshot,
1776+
State0#{log => Log,
1777+
snapshot_phase => next},
1778+
[{next_event, EventType, DeferredRpc} | Effects]};
1779+
true ->
1780+
{receive_snapshot, State0#{log => Log}, Effects}
1781+
end;
17691782
_ ->
17701783
{receive_snapshot, State0#{log => Log}, Effects}
17711784
end;
@@ -1824,23 +1837,24 @@ handle_receive_snapshot(Msg, State) ->
18241837
%% TODO: work out what else to handle
18251838
{receive_snapshot, State, [{reply, {error, {unsupported_call, Msg}}}]}.
18261839

1827-
abort_receive(#{snapshot_phase := Phase,
1840+
abort_receive(#{snapshot_phase := _Phase,
18281841
last_applied := LastApplied,
18291842
log := Log0} = State) ->
18301843
SnapState0 = ra_log:snapshot_state(Log0),
18311844
SnapState = ra_snapshot:abort_accept(SnapState0),
18321845
Log1 = ra_log:set_snapshot_state(SnapState, Log0),
1833-
Log = case Phase of
1834-
pre ->
1846+
Log = case State of
1847+
#{snapshot_has_live_indexes := true} ->
1848+
%% live indexes were written during pre phase,
1849+
%% reset log index to undo them
18351850
{ok, Log2} = ra_log:set_last_index(LastApplied, Log1),
18361851
Log2;
18371852
_ ->
18381853
Log1
18391854
end,
18401855
clear_leader_id(maps:without([snapshot_phase,
18411856
snapshot_has_live_indexes,
1842-
current_event_type,
1843-
snapshot_next_event],
1857+
current_event_type],
18441858
State#{log => Log})).
18451859

18461860
-spec handle_await_condition(ra_msg(), ra_server_state()) ->

test/ra_log_2_SUITE.erl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ all_tests() ->
7373
sparse_read,
7474
read_plan_modified,
7575
read_plan,
76+
read_plan_missing_index,
7677
sparse_read_out_of_range,
7778
sparse_read_out_of_range_2,
7879
written_event_after_snapshot,
@@ -586,6 +587,28 @@ read_plan(Config) ->
586587
[] = Indexes -- [I || I <- maps:keys(EntriesOut)],
587588
ok.
588589

590+
read_plan_missing_index(Config) ->
591+
Log0 = ra_log_init(Config),
592+
Log1 = write_and_roll(1, 500, 1, Log0, 50),
593+
Log2 = deliver_all_log_events(Log1, 100),
594+
{Log3, Effs} = ra_log:update_release_cursor(300, #{}, macctx(),
595+
[1, 299], Log2),
596+
ct:pal("Effs ~p", [Effs]),
597+
run_effs(Effs),
598+
599+
Log4 = assert_log_events(Log3,
600+
fun (L) ->
601+
#{snapshot_index := SnapIdx} = ra_log:overview(L),
602+
SnapIdx == 300
603+
end),
604+
timer:sleep(100),
605+
flush(),
606+
ReadPlan = ra_log:partial_read([128, 301], Log4, fun (_, _, Cmd) -> Cmd end),
607+
?assert(is_map(ra_log_read_plan:info(ReadPlan))),
608+
ct:pal("ReadPlan ~p", [ReadPlan]),
609+
{Entries, _} = ra_log_read_plan:execute(ReadPlan, undefined),
610+
ok.
611+
589612
written_event_after_snapshot(Config) ->
590613
Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),
591614
Log1 = ra_log:append({1, 1, <<"one">>}, Log0),

test/ra_server_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ setup_log() ->
212212
end),
213213
meck:expect(ra_log, write_config, fun ra_log_memory:write_config/2),
214214
meck:expect(ra_log, next_index, fun ra_log_memory:next_index/1),
215+
meck:expect(ra_log, has_pending, fun (_) -> false end),
215216
meck:expect(ra_log, append, fun ra_log_memory:append/2),
216217
meck:expect(ra_log, write, fun ra_log_memory:write/2),
217218
meck:expect(ra_log, write_sparse, fun ra_log_memory:write_sparse/3),

0 commit comments

Comments
 (0)