Skip to content

Commit 41f91d2

Browse files
committed
wip
1 parent 1520b6b commit 41f91d2

File tree

7 files changed

+71
-8
lines changed

7 files changed

+71
-8
lines changed

src/ra.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ start(Params) when is_list(Params) ->
140140
[ok = application:set_env(ra, Param, Value)
141141
|| {Param, Value} <- Params],
142142
Res = application:ensure_all_started(ra),
143+
ra_env:configure_logger(logger),
143144
_ = ra_system:start_default(),
144145
Res.
145146

src/ra_kv.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
-export([
1919
start_cluster/3,
2020
add_member/3,
21+
member_overview/1,
2122

2223
put/4,
2324
get/3,
@@ -62,7 +63,8 @@ start_cluster(System, ClusterName, #{members := ServerIds})
6263
#{id => Id,
6364
uid => UId,
6465
cluster_name => ClusterName,
65-
log_init_args => #{uid => UId},
66+
log_init_args => #{uid => UId,
67+
min_snapshot_interval => 0},
6668
initial_members => ServerIds,
6769
machine => Machine}
6870
end || Id <- ServerIds],
@@ -75,13 +77,22 @@ add_member(System, {Name, _} = Id, LeaderId) ->
7577
Config = #{id => Id,
7678
uid => UId,
7779
cluster_name => Name,
78-
log_init_args => #{uid => UId},
80+
log_init_args => #{uid => UId,
81+
min_snapshot_interval => 0},
7982
initial_members => Members,
8083
machine => Machine},
8184
ok = ra:start_server(System, Config),
8285
{ok, _, _} = ra:add_member(LeaderId, Id),
8386
ok.
8487

88+
member_overview(ServerId) ->
89+
case ra:member_overview(ServerId) of
90+
{ok, O, _} ->
91+
maps:with([log, machine], O);
92+
Err ->
93+
Err
94+
end.
95+
8596

8697
%% client
8798
-spec put(ra:server_id(), key(), value(), non_neg_integer()) ->

src/ra_log.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -725,9 +725,7 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
725725
{state(), [effect()]}.
726726
handle_event({written, Term, WrittenSeq},
727727
#?MODULE{cfg = Cfg,
728-
% range = Range,
729728
snapshot_state = SnapState,
730-
% first_index = FirstIdx,
731729
pending = Pend0} = State0) ->
732730
CurSnap = ra_snapshot:current(SnapState),
733731
%% gap detection
@@ -825,7 +823,7 @@ handle_event(major_compaction, #?MODULE{reader = Reader0,
825823
snapshot_state = SS} = State) ->
826824
case ra_snapshot:current(SS) of
827825
{SnapIdx, _} ->
828-
Effs = ra_log_segments:schedule_compaction(major,SnapIdx,
826+
Effs = ra_log_segments:schedule_compaction(major, SnapIdx,
829827
LiveIndexes, Reader0),
830828
{State, Effs};
831829
_ ->
@@ -841,6 +839,7 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
841839
snapshot_state = SnapState0} = State0)
842840
%% only update snapshot if it is newer than the last snapshot
843841
when SnapIdx >= FstIdx ->
842+
% ?assert(ra_snapshot:pending(SnapState0) =/= undefined),
844843
SnapState1 = ra_snapshot:complete_snapshot(Snap, SnapKind, LiveIndexes,
845844
SnapState0),
846845
case SnapKind of

src/ra_log_segments.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ make_links(Dir, To, From)
697697
_ = prim_file:delete(SymFn),
698698
%% make a symlink from the compacted target segment to a new .link
699699
%% where the compacted indexes now can be found
700-
ok = prim_file:make_symlink(To, SymFn),
700+
ok = prim_file:make_symlink(filename:basename(To), SymFn),
701701
%% rename to link to replace original segment
702702
ok = prim_file:rename(SymFn, SegFn)
703703
end || FromFn <- From],

src/ra_log_snapshot_state.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ smallest(Table, UId) when is_binary(UId) ->
2828
ets:lookup_element(Table, UId, 3, 0).
2929

3030
-spec live_indexes(ets:table(), ra:uid()) ->
31-
ra:index().
31+
ra_seq:state().
3232
live_indexes(Table, UId) when is_binary(UId) ->
3333
ets:lookup_element(Table, UId, 4, []).
3434

src/ra_snapshot.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ last_index_for(UId) ->
362362
end.
363363

364364
-spec begin_snapshot(meta(), MacModule :: module(),
365-
MacStateb :: term(), kind(), state()) ->
365+
MacState :: term(), kind(), state()) ->
366366
{state(), [effect()]}.
367367
begin_snapshot(#{index := Idx, term := Term} = Meta, MacMod, MacState, SnapKind,
368368
#?MODULE{module = Mod,

test/ra_log_2_SUITE.erl

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ all_tests() ->
5050
snapshot_written_after_installation,
5151
oldcheckpoints_deleted_after_snapshot_install,
5252
append_after_snapshot_installation,
53+
release_cursor_after_snapshot_installation,
5354
written_event_after_snapshot_installation,
5455
update_release_cursor,
5556
update_release_cursor_with_machine_version,
@@ -1285,6 +1286,57 @@ append_after_snapshot_installation(Config) ->
12851286
{[_, _], _} = ra_log_take(16, 17, Log),
12861287
ok.
12871288

1289+
release_cursor_after_snapshot_installation(Config) ->
1290+
Log0 = ra_log_init(Config, #{min_snapshot_interval => 0}),
1291+
{0, 0} = ra_log:last_index_term(Log0),
1292+
Log1 = assert_log_events(write_n(1, 16, 2, Log0),
1293+
fun (L) ->
1294+
LW = ra_log:last_written(L),
1295+
{15, 2} == LW
1296+
end),
1297+
1298+
Log2 = Log1,
1299+
1300+
%% create snapshot chunk
1301+
Meta = meta(15, 2, [?N1]),
1302+
Chunk = create_snapshot_chunk(Config, Meta, [1, 5, 10], #{}),
1303+
SnapState0 = ra_log:snapshot_state(Log2),
1304+
{ok, SnapState1} = ra_snapshot:begin_accept(Meta, SnapState0),
1305+
Machine = {machine, ?MODULE, #{}},
1306+
{SnapState, _, LiveIndexes, AEffs} = ra_snapshot:complete_accept(Chunk, 1, Machine,
1307+
SnapState1),
1308+
run_effs(AEffs),
1309+
{ok, Log3, Effs4} = ra_log:install_snapshot({15, 2}, ?MODULE, LiveIndexes,
1310+
ra_log:set_snapshot_state(SnapState, Log2)),
1311+
1312+
run_effs(Effs4),
1313+
{15, 2} = ra_snapshot:current(ra_log:snapshot_state(Log3)),
1314+
1315+
%% Write some entries
1316+
Log4 = assert_log_events(write_n(16, 20, 2, Log3),
1317+
fun (L) ->
1318+
LW = ra_log:last_written(L),
1319+
{19, 2} == LW
1320+
end),
1321+
1322+
%% then take a snapshot
1323+
{Log5, Effs5} = ra_log:update_release_cursor(19, #{?N1 => new_peer(),
1324+
?N2 => new_peer()},
1325+
?MODULE, [1, 5, 10, 17], Log4),
1326+
1327+
run_effs(Effs5),
1328+
%% ensure snapshot index has been updated and 1 segment deleted
1329+
Log = assert_log_events(Log5,
1330+
fun (L) ->
1331+
{19, 2} == ra_log:snapshot_index_term(L)
1332+
% andalso
1333+
% length(find_segments(Config)) == 1
1334+
end),
1335+
1336+
ct:pal("Log ~p", [Log]),
1337+
1338+
ok.
1339+
12881340
written_event_after_snapshot_installation(Config) ->
12891341
logger:set_primary_config(level, all),
12901342
%% simulates scenario where a server receives a written event from the wal

0 commit comments

Comments
 (0)