Skip to content

Commit 873db77

Browse files
committed
Fix snapshot live indexes replication bug
1 parent 13e1bf8 commit 873db77

File tree

8 files changed

+198
-39
lines changed

8 files changed

+198
-39
lines changed

src/ra_log.erl

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -706,10 +706,21 @@ last_written(#?MODULE{last_written_index_term = LWTI}) ->
706706
{ok, state()} | {not_found, state()}.
707707
set_last_index(Idx, #?MODULE{cfg = Cfg,
708708
range = Range,
709+
snapshot_state = SnapState,
709710
last_written_index_term = {LWIdx0, _}} = State0) ->
711+
Cur = ra_snapshot:current(SnapState),
710712
case fetch_term(Idx, State0) of
711-
{undefined, State} ->
713+
{undefined, State} when element(1, Cur) =/= Idx ->
714+
%% not found and Idx isn't equal to latest snapshot index
712715
{not_found, State};
716+
{_, State} when element(1, Cur) =:= Idx ->
717+
{_, SnapTerm} = Cur,
718+
%% Idx is equal to the current snapshot
719+
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
720+
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
721+
{ok, State#?MODULE{range = ra_range:limit(Idx + 1, Range),
722+
last_term = SnapTerm,
723+
last_written_index_term = Cur}};
713724
{Term, State1} ->
714725
LWIdx = min(Idx, LWIdx0),
715726
{LWTerm, State2} = fetch_term(LWIdx, State1),
@@ -1107,12 +1118,11 @@ assert(#?MODULE{cfg = #cfg{log_id = LogId},
11071118
range = Range,
11081119
snapshot_state = SnapState,
11091120
current_snapshot = CurrSnap,
1110-
live_indexes = LiveIndexes,
1111-
mem_table = _Mt
1121+
live_indexes = LiveIndexes
11121122
} = State) ->
11131123
%% TODO: remove this at some point?
1114-
?DEBUG("~ts: ra_log: asserting Range ~p Snapshot ~p LiveIndexes ~p",
1115-
[LogId, Range, CurrSnap, LiveIndexes]),
1124+
?DEBUG("~ts: ra_log: asserting Range ~p Snapshot ~p",
1125+
[LogId, Range, CurrSnap]),
11161126
%% perform assertions to ensure log state is correct
11171127
?assert(CurrSnap =:= ra_snapshot:current(SnapState)),
11181128
?assert(Range == undefined orelse

src/ra_log_segment.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ segref(Filename) ->
456456
SegRef.
457457

458458
-type infos() :: #{size => non_neg_integer(),
459+
index_size => non_neg_integer(),
459460
max_count => non_neg_integer(),
460461
file_type => regular | symlink,
461462
ctime => integer(),
@@ -493,8 +494,7 @@ info(Filename, Live0)
493494
{_, _, Sz, _} = maps:get(I, Index),
494495
Acc + Sz
495496
end, 0, Live),
496-
Info = #{
497-
size => Seg#state.data_write_offset,
497+
Info = #{size => Seg#state.data_write_offset,
498498
index_size => Seg#state.data_start,
499499
file_type => T,
500500
links => Links,

src/ra_log_segments.erl

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -726,32 +726,43 @@ compaction_groups([], Groups, _Conf) ->
726726
lists:reverse(Groups);
727727
compaction_groups(Infos, Groups, Conf) ->
728728
case take_group(Infos, Conf, []) of
729+
{[], RemInfos} ->
730+
compaction_groups(RemInfos, Groups, Conf);
729731
{Group, RemInfos} ->
730732
compaction_groups(RemInfos, [Group | Groups], Conf)
731733
end.
732734

733-
%% TODO: try to take potential size into account
734735
take_group([], _, Acc) ->
735736
{lists:reverse(Acc), []};
736737
take_group([{#{num_entries := NumEnts,
737-
live_size := LiveSize}, Live, {_, _}} = E | Rem] = All,
738-
#{max_count := Mc,
738+
index_size := IdxSz,
739+
size := Sz,
740+
live_size := LiveSz}, Live, {_, _}} = E | Rem] = All,
741+
#{max_count := MaxCnt,
739742
max_size := MaxSz}, Acc) ->
740-
Num = ra_seq:length(Live),
741-
case Num / NumEnts < 0.5 of
743+
NumLive = ra_seq:length(Live),
744+
AllDataSz = Sz - IdxSz,
745+
%% group on either num relaimable entries or data saved
746+
case NumLive / NumEnts < 0.5 orelse
747+
LiveSz / AllDataSz < 0.5 of
748+
%% there are fewer than half live entries in the segment
742749
true ->
743-
case Mc - Num < 0 orelse
744-
MaxSz - LiveSize < 0 of
750+
%% check that adding this segment to the current group will no
751+
%% exceed entry or size limits
752+
case MaxCnt - NumLive < 0 orelse
753+
MaxSz - LiveSz < 0 of
745754
true ->
755+
%% adding this segment to the group will exceed limits
756+
%% so returning current group
746757
{lists:reverse(Acc), All};
747758
false ->
748-
take_group(Rem, #{max_count => Mc - Num,
749-
max_size => MaxSz - LiveSize},
759+
take_group(Rem, #{max_count => MaxCnt - NumLive,
760+
max_size => MaxSz - LiveSz},
750761
[E | Acc])
751762
end;
752-
%% skip this secment
763+
%% skip this segment
753764
false when Acc == [] ->
754-
take_group(Rem, #{max_count => Mc,
765+
take_group(Rem, #{max_count => MaxCnt,
755766
max_size => MaxSz}, Acc);
756767
false ->
757768
{lists:reverse(Acc), Rem}

src/ra_mt.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ insert({Idx, _, _} = _Entry,
120120
end.
121121

122122
-spec insert_sparse(log_entry(), undefined | ra:index(), state()) ->
123-
{ok, state()} | {error, overwriting | gap_detected | limit_reached}.
123+
{ok, state()} | {error,
124+
overwriting |
125+
gap_detected |
126+
limit_reached}.
124127
insert_sparse({Idx, _, _} = Entry, LastIdx,
125128
#?MODULE{tid = Tid,
126129
indexes = Seq} = State) ->
@@ -299,6 +302,8 @@ get_items(Indexes, #?MODULE{} = State) ->
299302
non_neg_integer().
300303
delete(undefined) ->
301304
0;
305+
delete({indexes, _Tid, []}) ->
306+
0;
302307
delete({indexes, Tid, Seq}) ->
303308
NumToDelete = ra_seq:length(Seq),
304309
Start = ra_seq:first(Seq),

src/ra_server.erl

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,7 +1427,7 @@ handle_follower(#install_snapshot_rpc{term = Term,
14271427
meta = #{index := SnapIdx,
14281428
machine_version := SnapMacVer} = Meta,
14291429
leader_id = LeaderId,
1430-
chunk_state = {Num, _ChunkFlag}} = Rpc,
1430+
chunk_state = {Num, ChunkFlag}} = Rpc,
14311431
#{cfg := #cfg{log_id = LogId,
14321432
machine_version = MacVer}, log := Log0,
14331433
last_applied := LastApplied,
@@ -1443,7 +1443,17 @@ handle_follower(#install_snapshot_rpc{term = Term,
14431443
[LogId, SnapIdx, Term]),
14441444
SnapState0 = ra_log:snapshot_state(Log0),
14451445
{ok, SS} = ra_snapshot:begin_accept(Meta, SnapState0),
1446-
Log = ra_log:set_snapshot_state(SS, Log0),
1446+
Log1 = ra_log:set_snapshot_state(SS, Log0),
1447+
1448+
%% if the snaphost includes pre entries (live entries) then we need
1449+
%% to reset the log to the last applied index to avoid issues
1450+
Log = case ChunkFlag of
1451+
pre ->
1452+
{ok, L} = ra_log:set_last_index(LastApplied, Log1),
1453+
L;
1454+
_ ->
1455+
Log1
1456+
end,
14471457
{receive_snapshot, update_term(Term, State0#{log => Log,
14481458
leader_id => LeaderId}),
14491459
[{next_event, Rpc}, {record_leader_msg, LeaderId}]};
@@ -1538,17 +1548,18 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15381548
last_index = SnapIndex},
15391549
case ChunkFlag of
15401550
pre when is_list(ChunkOrEntries) ->
1541-
%% TODO: we may need to reset the log here to
1542-
%% the last applied index as we
1543-
%% dont know for sure indexes after last applied
1551+
%% reset last index to last applied
1552+
%% as we dont know for sure indexes after last applied
15441553
%% are of the right term
15451554
{LastIndex, _} = ra_log:last_index_term(Log00),
1546-
{Log0, _} = lists:foldl(
1555+
{Log, _} = lists:foldl(
15471556
fun ({I, _, _} = E, {L0, LstIdx}) ->
15481557
{ok, L} = ra_log:write_sparse(E, LstIdx, L0),
15491558
{L, I}
15501559
end, {Log00, LastIndex}, ChunkOrEntries),
1551-
State = update_term(Term, State0#{log => Log0}),
1560+
?DEBUG("~ts: receiving snapshot log last index ~p",
1561+
[LogId, ra_log:last_index_term(Log)]),
1562+
State = update_term(Term, State0#{log => Log}),
15521563
{receive_snapshot, State, [{reply, Reply}]};
15531564
next ->
15541565
SnapState0 = ra_log:snapshot_state(Log00),
@@ -1606,6 +1617,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16061617
membership =>
16071618
get_membership(ClusterIds, State0),
16081619
machine_state => MacState}),
1620+
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapIndex),
16091621
%% it was the last snapshot chunk so we can revert back to
16101622
%% follower status
16111623
{follower, persist_last_applied(State), [{reply, Reply} |

src/ra_server_proc.erl

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,7 +1535,9 @@ handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, _Id, Term}},
15351535
#state{server_state = SS0,
15361536
monitors = Monitors,
15371537
conf = #conf{snapshot_chunk_size = ChunkSize,
1538-
install_snap_rpc_timeout = InstallSnapTimeout} = Conf} = State0,
1538+
log_id = LogId,
1539+
install_snap_rpc_timeout = InstallSnapTimeout} = Conf}
1540+
= State0,
15391541
Actions) ->
15401542
case lists:member(ToNode, [node() | nodes()]) of
15411543
true ->
@@ -1546,7 +1548,7 @@ handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, _Id, Term}},
15461548
Pid = spawn(fun () ->
15471549
try send_snapshots(Id, Term, To,
15481550
ChunkSize, InstallSnapTimeout,
1549-
SnapState, Machine) of
1551+
SnapState, Machine, LogId) of
15501552
_ -> ok
15511553
catch
15521554
C:timeout:S ->
@@ -1906,29 +1908,39 @@ read_entries0(From, Idxs, #state{server_state = #{log := Log}} = State) ->
19061908
{keep_state, State, [{reply, From, {ok, ReadState}}]}.
19071909

19081910
send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
1909-
InstallTimeout, SnapState, Machine) ->
1911+
InstallTimeout, SnapState, Machine, LogId) ->
19101912
Context = ra_snapshot:context(SnapState, ToNode),
19111913
{ok, #{machine_version := SnapMacVer} = Meta, ReadState} =
19121914
ra_snapshot:begin_read(SnapState, Context),
19131915

1914-
%% only send the snapshot if the target server can accept it
1915-
%% TODO: grab the last_applied index also and use this to floor
1916-
%% the live indexes
1916+
%% TODO: consolidate getting the context, machinve version and last
1917+
%% applied index in one rpc, and handle errors
19171918
TheirMacVer = erpc:call(ToNode, ra_machine, version, [Machine]),
19181919

1919-
%% rpc the check what their
1920+
%% only send the snapshot if the target server can accept it
19201921
case SnapMacVer > TheirMacVer of
19211922
true ->
1923+
?DEBUG("~ts: not sending snapshot to ~w as their machine version ~b "
1924+
"is lower than snapshot machine version ~b",
1925+
[LogId, To, TheirMacVer, SnapMacVer]),
19221926
ok;
19231927
false ->
1928+
#{last_applied := LastApplied} = erpc:call(ToNode,
1929+
ra_counters,
1930+
counters,
1931+
[To, [last_applied]]),
19241932
RPC = #install_snapshot_rpc{term = Term,
19251933
leader_id = Id,
19261934
meta = Meta},
1927-
case ra_snapshot:indexes(ra_snapshot:current_snapshot_dir(SnapState)) of
1928-
{ok, [_|_] = Indexes} ->
1935+
case ra_snapshot:indexes(
1936+
ra_snapshot:current_snapshot_dir(SnapState)) of
1937+
{ok, [_|_] = Indexes0} ->
1938+
%% remove all indexes lower than the target's last applied
1939+
Indexes = ra_seq:floor(LastApplied + 1, Indexes0),
1940+
?DEBUG("~ts: sending live indexes ~w to ~w ",
1941+
[LogId, ra_seq:range(Indexes), To]),
19291942
%% there are live indexes to send before the snapshot
1930-
%% %% TODO: only send live indexes higher than the follower's
1931-
%% last_applied index
1943+
%% TODO: write ra_seq:list_chunk function to avoid expansion
19321944
Idxs = lists:reverse(ra_seq:expand(Indexes)),
19331945
Flru = lists:foldl(
19341946
fun (Is, F0) ->

test/ra_kv_SUITE.erl

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ all() ->
2020

2121
all_tests() ->
2222
[
23-
basics
23+
basics,
24+
snapshot_replication
2425
].
2526

2627
groups() ->
@@ -56,6 +57,71 @@ end_per_testcase(_TestCase, _Config) ->
5657
%%%===================================================================
5758

5859

60+
snapshot_replication(_Config) ->
61+
Members = [{kv1, node()}, {kv2, node()}],
62+
KvId = hd(Members),
63+
{ok, _, _} = ra_kv:start_cluster(?SYS, ?FUNCTION_NAME,
64+
#{members => Members}),
65+
ra:transfer_leadership(KvId, KvId),
66+
{ok, #{}} = ra_kv:put(KvId, <<"k1">>, <<"k1-value01">>, 5000),
67+
%% write 10k entries of the same key
68+
[{ok, #{}} = ra_kv:put(KvId, integer_to_binary(I), I, 5000)
69+
|| I <- lists:seq(1, 5000)],
70+
71+
?assertMatch({ok, #{machine := #{num_keys := _}}, KvId},
72+
ra:member_overview(KvId)),
73+
ra_log_wal:force_roll_over(ra_log_wal),
74+
%% wait for rollover processing
75+
ra_log_wal:last_writer_seq(ra_log_wal, <<>>),
76+
%% wait for segment writer to process
77+
ra_log_segment_writer:await(ra_log_segment_writer),
78+
%% promt ra_kv to take a snapshot
79+
ok = ra:aux_command(KvId, take_snapshot),
80+
ok = ra_lib:retry(
81+
fun () ->
82+
{ok, #{log := #{snapshot_index := SnapIdx,
83+
last_index := LastIdx}}, _} =
84+
ra:member_overview(KvId),
85+
SnapIdx == LastIdx
86+
end, 100, 100),
87+
88+
KvId3 = {kv3, node()},
89+
ok = ra_kv:add_member(?SYS, KvId3, KvId),
90+
KvId3Pid = whereis(kv3),
91+
?assert(is_pid(KvId3Pid)),
92+
{ok, #{}} = ra_kv:put(KvId, <<"k3">>, <<"k3-value">>, 5000),
93+
{ok, #{}} = ra_kv:put(KvId, <<"k4">>, <<"k4-value">>, 5000),
94+
ok = ra:aux_command(KvId, take_snapshot),
95+
% timer:sleep(1000),
96+
{ok, #{log := #{last_index := Kv1LastIndex }}, _} = ra:member_overview(KvId),
97+
ok = ra_lib:retry(
98+
fun () ->
99+
{ok, #{log := #{last_index := LastIdx}}, _} =
100+
ra:member_overview(KvId3),
101+
Kv1LastIndex == LastIdx
102+
end, 100, 100),
103+
ct:pal("counters ~p", [ra_counters:counters(KvId3, [last_applied])]),
104+
%% ensure kv3 did not crash during snapshot replication
105+
?assertEqual(KvId3Pid, whereis(kv3)),
106+
107+
ok = ra:stop_server(default, KvId3),
108+
109+
{ok, #{}} = ra_kv:put(KvId, <<"k5">>, <<"k5-value">>, 5000),
110+
{ok, #{}} = ra_kv:put(KvId, <<"k6">>, <<"k6-value">>, 5000),
111+
ok = ra:aux_command(KvId, take_snapshot),
112+
113+
ok = ra:restart_server(default, KvId3),
114+
{ok, #{log := #{last_index := Kv1LastIndex2}}, _} = ra:member_overview(KvId),
115+
ok = ra_lib:retry(
116+
fun () ->
117+
{ok, #{log := #{last_index := LastIdx}}, _} =
118+
ra:member_overview(KvId3),
119+
Kv1LastIndex2 == LastIdx
120+
end, 100, 100),
121+
122+
ra:delete_cluster([KvId, {kv2, node()}, KvId3]),
123+
ok.
124+
59125
basics(_Config) ->
60126
Members = [{kv1, node()}],
61127
KvId = hd(Members),
@@ -136,4 +202,5 @@ basics(_Config) ->
136202
undefined, 1000),
137203
?assertEqual(Reads4, Reads5),
138204
ct:pal("counters ~p", [ra_counters:overview(KvId)]),
205+
ra:delete_cluster([KvId, KvId2]),
139206
ok.

0 commit comments

Comments
 (0)