Skip to content

Commit e35e265

Browse files
committed
Add strategy for automatically triggering major compactions.
Two strategies now exist: 1. manual - no automatic triggering of major compactions. 2. {num_minors, Num} - promotes a minor compaction to major after N minor compactions have occurred. Default: 8
1 parent e3ea015 commit e35e265

File tree

7 files changed

+112
-25
lines changed

7 files changed

+112
-25
lines changed

src/ra.hrl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@
223223
-define(SEGMENT_MAX_ENTRIES, 4096).
224224
-define(SEGMENT_MAX_PENDING, 1024).
225225
-define(SEGMENT_MAX_SIZE_B, 64_000_000). %% set an upper limit on segment sizing
226+
-define(DEF_MAJOR_COIMPACTION_STRAT, {num_minors, 8}).
226227

227228
%% logging shim
228229
-define(DEBUG_IF(Bool, Fmt, Args),
@@ -285,6 +286,8 @@
285286
"Number of checkpoint bytes written"},
286287
{checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter,
287288
"Number of checkpoints promoted to snapshots"},
289+
{minor_compactions, ?C_RA_LOG_COMPACTIONS_MINOR_COUNT, counter,
290+
"Number of requested minor compactions"},
288291
{major_compactions, ?C_RA_LOG_COMPACTIONS_MAJOR_COUNT, counter,
289292
"Number of requested major compactions"},
290293
{major_compaction_segments_written,
@@ -311,10 +314,11 @@
311314
-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
312315
-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
313316
-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15).
314-
-define(C_RA_LOG_COMPACTIONS_MAJOR_COUNT, 16).
315-
-define(C_RA_LOG_COMPACTIONS_SEGMENTS_WRITTEN, 17).
316-
-define(C_RA_LOG_COMPACTIONS_SEGMENTS_COMPACTED, 18).
317-
-define(C_RA_LOG_RESERVED, 19).
317+
-define(C_RA_LOG_COMPACTIONS_MINOR_COUNT, 16).
318+
-define(C_RA_LOG_COMPACTIONS_MAJOR_COUNT, 17).
319+
-define(C_RA_LOG_COMPACTIONS_SEGMENTS_WRITTEN, 18).
320+
-define(C_RA_LOG_COMPACTIONS_SEGMENTS_COMPACTED, 19).
321+
-define(C_RA_LOG_RESERVED, 20).
318322

319323
-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
320324
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).

src/ra_log.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@
141141
snapshot_module => module(),
142142
counter => counters:counters_ref(),
143143
initial_access_pattern => sequential | random,
144-
max_checkpoints => non_neg_integer()}.
144+
max_checkpoints => non_neg_integer(),
145+
major_compaction_strategy =>
146+
ra_log_segments:major_compaction_strategy()}.
145147

146148

147149
-type overview() ::
@@ -233,7 +235,10 @@ init(#{uid := UId,
233235
SegRefs = my_segrefs(UId, SegWriter),
234236
SegmentMaxCount = maps:get(segment_max_entries, Conf, ?SEGMENT_MAX_ENTRIES),
235237
SegmentMaxSize = maps:get(segment_max_size_bytes, Conf, ?SEGMENT_MAX_SIZE_B),
238+
MajorCompStrat = maps:get(major_compaction_strategy, Conf,
239+
?DEF_MAJOR_COIMPACTION_STRAT),
236240
CompConf = #{max_size => SegmentMaxSize,
241+
major_strategy => MajorCompStrat,
237242
max_count => SegmentMaxCount},
238243
Reader = ra_log_segments:init(UId, Dir, MaxOpen, AccessPattern, SegRefs,
239244
Counter, CompConf, LogId),

src/ra_log_segment_writer.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ init([#{data_dir := DataDir,
114114
name := SegWriterName,
115115
system := System} = Conf]) ->
116116
process_flag(trap_exit, true),
117-
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS, #{ra_system => System, module => ?MODULE}),
117+
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS,
118+
#{ra_system => System,
119+
module => ?MODULE}),
118120
SegmentConf = maps:get(segment_conf, Conf, #{}),
119121
maybe_upgrade_segment_file_names(System, DataDir),
120122
{ok, #state{system = System,

src/ra_log_segments.erl

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,15 @@
3939

4040
-define(SYMLINK_KEEPFOR_S, 60).
4141

42+
%% type for configuring automatic major compaction strategies
43+
-type major_compaction_strategy() :: manual |
44+
{num_minors, pos_integer()}.
45+
4246
-type compaction_conf() :: #{max_count => non_neg_integer(),
43-
max_size => non_neg_integer()}.
47+
max_size => non_neg_integer(),
48+
major_strategy => major_compaction_strategy()}.
49+
%% hardly every used anymore, the sequential access pattern is only activated
50+
%% during recovery
4451
-type access_pattern() :: sequential | random.
4552
%% holds static or rarely changing fields
4653
-record(cfg, {uid :: ra_uid(),
@@ -58,7 +65,7 @@
5865
segment_refs :: ra_lol:state(),
5966
open_segments :: ra_flru:state(),
6067
compaction :: undefined | major | minor,
61-
next_compaction :: undefined | major | minor
68+
minor_compaction_count = 0 :: non_neg_integer()
6269
}).
6370

6471
-record(compaction_result,
@@ -74,7 +81,8 @@
7481
-export_type([
7582
state/0,
7683
read_plan/0,
77-
read_plan_options/0
84+
read_plan_options/0,
85+
major_compaction_strategy/0
7886
]).
7987

8088
%% PUBLIC
@@ -85,7 +93,8 @@
8593
map(),
8694
unicode:chardata()) -> state().
8795
init(UId, Dir, MaxOpen, AccessPattern, SegRefs0, Counter, CompConf, LogId)
88-
when is_binary(UId) ->
96+
when is_binary(UId) andalso
97+
is_map(CompConf) ->
8998
Cfg = #cfg{uid = UId,
9099
log_id = LogId,
91100
counter = Counter,
@@ -173,27 +182,42 @@ update_segments(NewSegmentRefs, #?STATE{open_segments = Open0,
173182
-spec schedule_compaction(minor | major, ra:index(),
174183
ra_seq:state(), state()) ->
175184
{state(), [ra_server:effect()]}.
185+
schedule_compaction(minor, SnapIdx, LiveIndexes,
186+
#?MODULE{cfg =
187+
#cfg{compaction_conf =
188+
#{major_strategy :=
189+
{num_minors, NumMinors}}},
190+
minor_compaction_count = MinorCount} = State)
191+
when MinorCount >= NumMinors ->
192+
%% promote to major compaction
193+
schedule_compaction(major, SnapIdx, LiveIndexes, State);
176194
schedule_compaction(Type, SnapIdx, LiveIndexes,
177195
#?MODULE{cfg = #cfg{log_id = LogId,
178196
compaction_conf = CompConf,
179197
directory = Dir} = Cfg,
198+
minor_compaction_count = MinorCompCnt,
180199
compaction = undefined} = State) ->
181200
case compactable_segrefs(SnapIdx, State) of
182201
[] ->
183202
{State, []};
184203
SegRefs when LiveIndexes == [] ->
204+
185205
%% if LiveIndexes is [] we can just delete all compactable
186206
%% segment refs
187207
Unreferenced = [F || {F, _} <- SegRefs],
208+
ok = incr_counter(Cfg, ?C_RA_LOG_COMPACTIONS_MINOR_COUNT, 1),
188209
Result = #compaction_result{unreferenced = Unreferenced},
189-
{State#?MODULE{compaction = minor},
210+
{State#?MODULE{compaction = minor,
211+
minor_compaction_count = MinorCompCnt + 1},
190212
[{next_event,
191213
{ra_log_event, {compaction_result, Result}}}]};
192214
SegRefs when Type == minor ->
193-
%% TODO evaluate if minor compactions are fast enough to run
215+
%% TODO: evaluate if minor compactions are fast enough to run
194216
%% in server process
217+
ok = incr_counter(Cfg, ?C_RA_LOG_COMPACTIONS_MINOR_COUNT, 1),
195218
Result = minor_compaction(SegRefs, LiveIndexes),
196-
{State#?MODULE{compaction = minor},
219+
{State#?MODULE{compaction = minor,
220+
minor_compaction_count = MinorCompCnt + 1},
197221
[{next_event,
198222
{ra_log_event, {compaction_result, Result}}}]};
199223
SegRefs ->
@@ -213,7 +237,8 @@ schedule_compaction(Type, SnapIdx, LiveIndexes,
213237
ok
214238
end,
215239

216-
{State#?MODULE{compaction = major},
240+
{State#?MODULE{compaction = major,
241+
minor_compaction_count = 0},
217242
[{bg_work, Fun,
218243
fun (Err) ->
219244
%% send an empty compaction result to ensure the
@@ -385,9 +410,11 @@ fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) ->
385410

386411
-spec info(state()) -> map().
387412
info(#?STATE{cfg = #cfg{} = _Cfg,
413+
minor_compaction_count = MinorCount,
388414
open_segments = Open} = State) ->
389415
#{max_size => ra_flru:max_size(Open),
390-
num_segments => segment_ref_count(State)}.
416+
num_segments => segment_ref_count(State),
417+
minor_compactions_count => MinorCount}.
391418

392419
-spec purge_symlinks(file:filename_all(),
393420
OlderThanSec :: non_neg_integer()) -> ok.

test/ra_kv_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ snapshot_replication_interrupted(_Config) ->
6969
#{members => Members}),
7070
ra:transfer_leadership(KvId, KvId),
7171
Data = crypto:strong_rand_bytes(100_000),
72-
%% write 10k entries of the same key
72+
%% write 10k entries of different keys
7373
[{ok, #{}} = ra_kv:put(KvId, term_to_binary(I), Data, 5000)
7474
|| I <- lists:seq(1, 10_000)],
7575
?assertMatch({ok, #{machine := #{num_keys := 10_000}}, KvId},
@@ -119,7 +119,7 @@ snapshot_replication(_Config) ->
119119
#{members => Members}),
120120
ra:transfer_leadership(KvId, KvId),
121121
{ok, #{}} = ra_kv:put(KvId, <<"k1">>, <<"k1-value01">>, 5000),
122-
%% write 10k entries of the same key
122+
%% write 10k entries with different keys
123123
[{ok, #{}} = ra_kv:put(KvId, integer_to_binary(I), I, 5000)
124124
|| I <- lists:seq(1, 5000)],
125125

test/ra_log_2_SUITE.erl

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -933,10 +933,6 @@ recovery(Config) ->
933933
{20, 3} = ra_log:last_index_term(Log5),
934934
Log6 = validate_fold(1, 4, 1, Log5),
935935
Log7 = validate_fold(5, 14, 2, Log6),
936-
% debugger:start(),
937-
% int:i(ra_log),
938-
% int:break(ra_log, 413),
939-
940936
Log8 = validate_fold(15, 20, 3, Log7),
941937
ra_log:close(Log8),
942938

test/ra_log_segments_SUITE.erl

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
-compile(nowarn_export_all).
1010
-compile(export_all).
1111

12+
-include("src/ra.hrl").
1213
-include_lib("common_test/include/ct.hrl").
1314
-include_lib("eunit/include/eunit.hrl").
1415
-include_lib("kernel/include/file.hrl").
@@ -35,7 +36,8 @@ all_tests() ->
3536
minor,
3637
overwrite,
3738
result_after_segments,
38-
result_after_segments_overwrite
39+
result_after_segments_overwrite,
40+
major_strategy_num_minors
3941
].
4042

4143
groups() ->
@@ -49,6 +51,7 @@ init_per_testcase(TestCase, Config) ->
4951
ok = ra_lib:make_dir(Dir),
5052
CompConf = #{max_count => 128,
5153
max_size => 128_000},
54+
ra_counters:init(),
5255
[{uid, atom_to_binary(TestCase, utf8)},
5356
{comp_conf, CompConf},
5457
{test_case, TestCase},
@@ -609,13 +612,60 @@ major(Config) ->
609612
length(Files) == 5
610613
end}
611614
],
612-
613615
SegConf = #{max_count => 128},
614616
Segs0 = ra_log_segments_init(Config, Dir, seg_refs(Dir)),
615617
run_scenario([{seg_conf, SegConf} | Config], Segs0, Scen),
616618

617619
ok.
618620

621+
major_strategy_num_minors(Config) ->
622+
SegConf = #{max_count => 16},
623+
Dir = ?config(dir, Config),
624+
Entries1 = lists:seq(1, 32),
625+
Entries2 = lists:seq(33, 64),
626+
Entries3 = lists:seq(65, 96),
627+
Live1 = ra_seq:from_list(lists:seq(1, 32, 3)),
628+
Live2 = ra_seq:from_list(lists:seq(1, 64, 3)),
629+
Live3 = ra_seq:from_list(lists:seq(1, 96, 3)),
630+
ct:pal("Live1 ~p", [Live1]),
631+
Scen =
632+
[
633+
{entries, 1, Entries1},
634+
{assert, 1, lists:seq(1, 32)},
635+
{assert, fun (S) ->
636+
SegRefs = ra_log_segments:segment_refs(S),
637+
length(SegRefs) == 2
638+
end},
639+
{minor, 30, Live1},
640+
handle_compaction_result,
641+
{assert, 1, lists:seq(1, 32, 3)},
642+
643+
{entries, 1, Entries2},
644+
{minor, 60, Live2},
645+
handle_compaction_result,
646+
{entries, 1, Entries3},
647+
{minor, 90, Live3},
648+
handle_compaction_result
649+
650+
],
651+
652+
Counters = ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
653+
CompConf = #{max_count => 16,
654+
max_size => 128_000,
655+
major_strategy => {num_minors, 2}},
656+
657+
Segs0 = ra_log_segments_init(Config, Dir, seg_refs(Dir), CompConf, Counters),
658+
_Segs = run_scenario([{seg_conf, SegConf} | Config], Segs0, Scen),
659+
?assertMatch(#{major_compactions := 1,
660+
minor_compactions := 2},
661+
ra_counters:counters(?FUNCTION_NAME,
662+
[minor_compactions,
663+
major_compactions])),
664+
665+
ok.
666+
667+
668+
619669
%% Helpers
620670

621671
open_last_segment(Config, SegConf) ->
@@ -766,10 +816,13 @@ with_ext(Fn, Ext) when is_binary(Fn) andalso is_list(Ext) ->
766816
<<(filename:rootname(Fn))/binary, (ra_lib:to_binary(Ext))/binary>>.
767817

768818
ra_log_segments_init(Config, Dir, SegRefs) ->
769-
UId = ?config(uid, Config),
770819
CompConf = ?config(comp_conf, Config),
820+
ra_log_segments_init(Config, Dir, SegRefs, CompConf, undefined).
821+
822+
ra_log_segments_init(Config, Dir, SegRefs, CompConf, Counters) ->
823+
UId = ?config(uid, Config),
771824
ra_log_segments:init(UId, Dir, 1, random,
772-
SegRefs, undefined,
825+
SegRefs, Counters,
773826
CompConf, "").
774827

775828
do_compaction(Dir, CompactingFn, Live, All) ->

0 commit comments

Comments
 (0)