Skip to content

Commit f17675a

Browse files
committed
add segment copy op
1 parent 13d4e03 commit f17675a

File tree

5 files changed

+88
-19
lines changed

5 files changed

+88
-19
lines changed

docs/internals/COMPACTION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ e.g. the compacted segment `001.compacting` to `001.segment` without breaking
101101
any references to the segment. Single segment compaction should only be triggered
102102
when a certain limit has been reached, e.g. > 50% of indexes can be cleared up.
103103

104+
TODO: how to handle compaction of segments that have indexes that never were
105+
committed, i.e. overwritten?
104106

105107

106108
#### When does phase 3 compaction run?

src/ra_log.erl

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ init(#{uid := UId,
223223
% this queries the segment writer and thus blocks until any
224224
% segments it is currently processed have been finished
225225
MtRange = ra_mt:range(Mt0),
226+
%% TODO: init ra_log_reader here instead and let it take care of range
227+
%% calulation and segref compaction
226228
{{FirstIdx, LastIdx0}, SegRefs} = case recover_ranges(UId, MtRange, SegWriter) of
227229
{undefined, SRs} ->
228230
{{-1, -1}, SRs};
@@ -938,31 +940,30 @@ install_snapshot({SnapIdx, SnapTerm} = IdxTerm, MacMod, LiveIndexes,
938940
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, SnapIdx),
939941
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, SnapIdx),
940942

941-
CompEffs = schedule_compaction(SnapIdx, State0),
942-
943943
{SnapState, Checkpoints} =
944944
ra_snapshot:take_older_checkpoints(SnapIdx, SnapState0),
945945
CPEffects = [{delete_snapshot,
946946
ra_snapshot:directory(SnapState, checkpoint),
947947
Checkpoint} || Checkpoint <- Checkpoints],
948-
State = State0#?MODULE{snapshot_state = SnapState,
949-
first_index = SnapIdx + 1,
950-
last_index = SnapIdx,
951-
last_term = SnapTerm,
952-
last_written_index_term = IdxTerm},
953-
%% TODO: more mt entries could potentially be cleared up in the
954-
%% mem table here
955948
SmallestLiveIndex = case ra_seq:first(LiveIndexes) of
956949
undefined ->
957950
SnapIdx + 1;
958951
I ->
959952
I
960953
end,
954+
%% TODO: more mt entries could potentially be cleared up in the
955+
%% mem table here
961956
{Spec, Mt} = ra_mt:set_first(SmallestLiveIndex, Mt0),
962957
ok = exec_mem_table_delete(Names, UId, Spec),
963-
{ok, State#?MODULE{live_indexes = LiveIndexes,
964-
mem_table = Mt},
965-
CompEffs ++ CPEffects}.
958+
State = State0#?MODULE{snapshot_state = SnapState,
959+
first_index = SnapIdx + 1,
960+
last_index = SnapIdx,
961+
last_term = SnapTerm,
962+
live_indexes = LiveIndexes,
963+
mem_table = Mt,
964+
last_written_index_term = IdxTerm},
965+
CompEffs = schedule_compaction(SnapIdx, State),
966+
{ok, State, CompEffs ++ CPEffects}.
966967

967968

968969
-spec recover_snapshot(State :: state()) ->
@@ -1261,6 +1262,9 @@ schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{},
12611262
end, lists:reverse(Compactable)),
12621263
SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
12631264

1265+
%% TODO: minor compactions should also delete / truncate
1266+
%% segments with completely overwritten indexes
1267+
12641268
Self = self(),
12651269
Fun = fun () ->
12661270
{ok, Indexes} = ra_snapshot:indexes(SnapDir),

src/ra_log_reader.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ update_segments(NewSegmentRefs,
109109
segment_refs = SegRefs0} = State) ->
110110

111111
SegmentRefs0 = ra_lol:to_list(SegRefs0),
112+
%% TODO: capture segrefs removed by compact_segrefs/2 and delete them
112113
SegmentRefsComp = compact_segrefs(NewSegmentRefs, SegmentRefs0),
113114
SegmentRefsCompRev = lists:reverse(SegmentRefsComp),
114115
SegRefs = ra_lol:from_list(fun seg_ref_gt/2, SegmentRefsCompRev),
@@ -470,13 +471,13 @@ compact_seg_refs_test() ->
470471

471472
compact_segref_3_test() ->
472473
Data = [
473-
{{2, 7}, "B"},
474+
{{2, 7}, "C"},
474475
%% this entry has overwritten the prior two
475476
{{5, 10}, "B"},
476477
{{1, 4}, "A"}
477478
],
478479
Res = compact_segrefs(Data, []),
479-
?assertMatch([{{2, 7}, "B"},
480+
?assertMatch([{{2, 7}, "C"},
480481
{{1, 1}, "A"}], Res),
481482
ok.
482483

src/ra_log_segment.erl

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
max_count/1,
2323
filename/1,
2424
segref/1,
25-
is_same_as/2]).
25+
is_same_as/2,
26+
copy/3]).
2627

2728
-export([dump/1,
2829
dump_index/1]).
2930

3031
-include("ra.hrl").
3132

33+
-include_lib("stdlib/include/assert.hrl").
3234
-include_lib("kernel/include/file.hrl").
3335

3436
-define(VERSION, 2).
@@ -474,6 +476,23 @@ close(#state{cfg = #cfg{fd = Fd}}) ->
474476
_ = file:close(Fd),
475477
ok.
476478

479+
-spec copy(state(), file:filename_all(), [ra:index()]) ->
480+
{ok, state()} | {error, term()}.
481+
copy(#state{} = State0, FromFile, Indexes)
482+
when is_list(Indexes) ->
483+
{ok, From} = open(FromFile, #{mode => read}),
484+
%% TODO: the current approach recalculates the CRC and isn't completely
485+
%% optimial. Also it does not allow for a future where copy_file_range may
486+
%% be available
487+
State = lists:foldl(
488+
fun (I, S0) ->
489+
{ok, Term, Data} = simple_read(From, I),
490+
{ok, S} = append(S0, I, Term, Data),
491+
S
492+
end, State0, lists:sort(Indexes)),
493+
close(From),
494+
sync(State).
495+
477496
%%% Internal
478497

479498
is_same_filename_all(Fn, Fn) ->
@@ -693,6 +712,20 @@ is_full(#state{cfg = #cfg{max_size = MaxSize},
693712
IndexOffset >= DataStart orelse
694713
(DataOffset - DataStart) > MaxSize.
695714

715+
simple_read(#state{cfg = #cfg{fd = Fd},
716+
index = SegIndex}, Idx)
717+
when is_map_key(Idx, SegIndex) ->
718+
{Term, Pos, Len, _} = map_get(Idx, SegIndex),
719+
case file:pread(Fd, Pos, Len) of
720+
{ok, Data} ->
721+
?assert(byte_size(Data) == Len),
722+
{ok, Term, Data};
723+
Err ->
724+
Err
725+
end;
726+
simple_read(_State, _) ->
727+
{error, not_found}.
728+
696729
-ifdef(TEST).
697730
-include_lib("eunit/include/eunit.hrl").
698731

test/ra_log_segment_SUITE.erl

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ all_tests() ->
3838
corrupted_segment,
3939
large_segment,
4040
segref,
41-
versions_v1
41+
versions_v1,
42+
copy
4243
].
4344

4445
groups() ->
@@ -460,16 +461,44 @@ read_sparse_append_read(Config) ->
460461
ra_log_segment:close(R0),
461462
ok.
462463

464+
copy(Config) ->
465+
Dir = ?config(data_dir, Config),
466+
Indexes = lists:seq(1, 100),
467+
SrcFn = filename:join(Dir, <<"SOURCE1.segment">>),
468+
{ok, SrcSeg0} = ra_log_segment:open(SrcFn),
469+
SrcSeg1 = lists:foldl(
470+
fun (I, S0) ->
471+
{ok, S} = ra_log_segment:append(S0, I, 1, term_to_binary(I)),
472+
S
473+
end, SrcSeg0, Indexes),
474+
_ = ra_log_segment:close(SrcSeg1),
475+
476+
Fn = filename:join(Dir, <<"TARGET.segment">>),
477+
{ok, Seg0} = ra_log_segment:open(Fn),
478+
CopyIndexes = lists:seq(1, 100, 2),
479+
{ok, Seg} = ra_log_segment:copy(Seg0, SrcFn, CopyIndexes),
480+
ra_log_segment:close(Seg),
481+
{ok, R} = ra_log_segment:open(Fn, #{mode => read,
482+
access_pattern => random}),
483+
%%TODO: consider makeing read_sparse tolerant to missing indexes somehow
484+
%% perhaps detecting if the segment is "sparse"
485+
{ok, 2, [_, _]} = ra_log_segment:read_sparse(R, [1, 3],
486+
fun (I, T, B, Acc) ->
487+
[{I, T, binary_to_term(B)} | Acc]
488+
end, []),
489+
ra_log_segment:close(R),
490+
491+
ok.
492+
493+
494+
%%% Internal
463495
write_until_full(Idx, Term, Data, Seg0) ->
464496
case ra_log_segment:append(Seg0, Idx, Term, Data) of
465497
{ok, Seg} ->
466498
write_until_full(Idx+1, Term, Data, Seg);
467499
{error, full} ->
468500
Seg0
469501
end.
470-
471-
472-
%%% Internal
473502
make_data(Size) ->
474503
term_to_binary(crypto:strong_rand_bytes(Size)).
475504

0 commit comments

Comments
 (0)