Skip to content

Commit 234ba96

Browse files
committed
compaction counters
1 parent 6217867 commit 234ba96

File tree

7 files changed

+57
-12
lines changed

7 files changed

+57
-12
lines changed

src/ra.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@
7373
member_overview/1,
7474
member_overview/2,
7575
key_metrics/1,
76-
key_metrics/2
76+
key_metrics/2,
77+
trigger_compaction/1
7778
]).
7879

7980
-define(START_TIMEOUT, ?DEFAULT_TIMEOUT).
@@ -1198,6 +1199,13 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() ->
11981199
key_metrics({_, N} = ServerId, Timeout) ->
11991200
erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout).
12001201

1202+
%% @doc Potentially triggers a major compaction for the provided member
1203+
%% @param ServerId the Ra server to send the request to
1204+
%% @end
1205+
-spec trigger_compaction(ra_server_id()) -> ok.
1206+
trigger_compaction(ServerRef) ->
1207+
gen_statem:cast(ServerRef, {ra_log_event, major_compaction}).
1208+
12011209
%% internal
12021210

12031211
-spec usr(UserCommand, ReplyMode) -> Command when

src/ra.hrl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,17 @@
278278
"Number of checkpoint bytes written"},
279279
{checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter,
280280
"Number of checkpoints promoted to snapshots"},
281+
{major_compactions, ?C_RA_LOG_COMPACTIONS_MAJOR_COUNT, counter,
282+
"Number of requested major compactions"},
283+
{major_compaction_segments_written,
284+
?C_RA_LOG_COMPACTIONS_SEGMENTS_WRITTEN, counter,
285+
"Number of segments written during major compactions"},
286+
{major_compaction_segments_compacted,
287+
?C_RA_LOG_COMPACTIONS_SEGMENTS_COMPACTED, counter,
288+
"Number of segments compacted during major compactions"},
281289
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
282290
]).
291+
283292
-define(C_RA_LOG_WRITE_OPS, 1).
284293
-define(C_RA_LOG_WRITE_RESENDS, 2).
285294
-define(C_RA_LOG_READ_OPS, 3).
@@ -295,7 +304,10 @@
295304
-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
296305
-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
297306
-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15).
298-
-define(C_RA_LOG_RESERVED, 16).
307+
-define(C_RA_LOG_COMPACTIONS_MAJOR_COUNT, 16).
308+
-define(C_RA_LOG_COMPACTIONS_SEGMENTS_WRITTEN, 17).
309+
-define(C_RA_LOG_COMPACTIONS_SEGMENTS_COMPACTED, 18).
310+
-define(C_RA_LOG_RESERVED, 19).
299311

300312
-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
301313
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).

src/ra_lib.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,9 @@ retry(Func, Attempt, Sleep) ->
344344
ok;
345345
true ->
346346
ok;
347-
_ ->
347+
_Err ->
348348
timer:sleep(Sleep),
349-
retry(Func, Attempt - 1)
349+
retry(Func, Attempt - 1, Sleep)
350350
end.
351351

352352
-spec write_file(file:name_all(), iodata()) ->

src/ra_log.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171
{snapshot_written, ra_idxterm(),
7272
LiveIndexes :: ra_seq:state(),
7373
ra_snapshot:kind()} |
74+
{compaction_result, term()} |
75+
major_compaction |
7476
{down, pid(), term()}.
7577

7678
-type event() :: {ra_log_event, event_body()}.
@@ -818,6 +820,17 @@ handle_event({compaction_result, Result},
818820
#?MODULE{reader = Reader0} = State) ->
819821
{Reader, Effs} = ra_log_segments:handle_compaction_result(Result, Reader0),
820822
{State#?MODULE{reader = Reader}, Effs};
823+
handle_event(major_compaction, #?MODULE{reader = Reader0,
824+
live_indexes = LiveIndexes,
825+
snapshot_state = SS} = State) ->
826+
case ra_snapshot:current(SS) of
827+
{SnapIdx, _} ->
828+
Effs = ra_log_segments:schedule_compaction(major,SnapIdx,
829+
LiveIndexes, Reader0),
830+
{State, Effs};
831+
_ ->
832+
{State, []}
833+
end;
821834
handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
822835
#?MODULE{cfg = #cfg{uid = UId,
823836
names = Names} = Cfg,

src/ra_log_segments.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ update_segments(NewSegmentRefs, #?STATE{open_segments = Open0,
150150
[ra_server:effect()].
151151
schedule_compaction(Type, SnapIdx, LiveIndexes,
152152
#?MODULE{cfg = #cfg{log_id = LogId,
153-
directory = Dir}} = State) ->
153+
directory = Dir} = Cfg} = State) ->
154154
case compactable_segrefs(SnapIdx, State) of
155155
[] ->
156156
[];
@@ -170,6 +170,7 @@ schedule_compaction(Type, SnapIdx, LiveIndexes,
170170
SegRefs ->
171171
Self = self(),
172172
Fun = fun () ->
173+
ok = incr_counter(Cfg, ?C_RA_LOG_COMPACTIONS_MAJOR_COUNT, 1),
173174
MajConf = #{dir => Dir},
174175
Result = major_compaction(MajConf, SegRefs,
175176
LiveIndexes),
@@ -200,7 +201,7 @@ handle_compaction_result(#compaction_result{unreferenced = [],
200201
handle_compaction_result(#compaction_result{unreferenced = Unreferenced,
201202
linked = Linked,
202203
compacted = Compacted},
203-
#?STATE{cfg = #cfg{directory = Dir},
204+
#?STATE{cfg = #cfg{directory = Dir} = Cfg,
204205
open_segments = Open0,
205206
segment_refs = SegRefs0} = State) ->
206207
SegRefs1 = maps:from_list(ra_lol:to_list(SegRefs0)),
@@ -214,6 +215,10 @@ handle_compaction_result(#compaction_result{unreferenced = Unreferenced,
214215
|| F <- Unreferenced],
215216
ok
216217
end,
218+
ok = incr_counter(Cfg, ?C_RA_LOG_COMPACTIONS_SEGMENTS_WRITTEN,
219+
length(Compacted)),
220+
ok = incr_counter(Cfg, ?C_RA_LOG_COMPACTIONS_SEGMENTS_COMPACTED,
221+
length(Linked) + length(Compacted)),
217222
{State#?MODULE{segment_refs = ra_lol:from_list(fun seg_ref_gt/2,
218223
SegmentRefs),
219224
open_segments = Open},

test/ra_kv_SUITE.erl

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33
-compile(nowarn_export_all).
44
-compile(export_all).
55

6-
-export([
7-
]).
8-
9-
-include_lib("src/ra.hrl").
106
-include_lib("common_test/include/ct.hrl").
117
-include_lib("eunit/include/eunit.hrl").
128

139
-define(SYS, default).
10+
1411
%%%===================================================================
1512
%%% Common Test callbacks
1613
%%%===================================================================
@@ -118,7 +115,6 @@ basics(_Config) ->
118115
{ok, {Reads3, _}} = ra_server_proc:read_entries(KvId2, [LastIdx | Live],
119116
undefined, 1000),
120117
ct:pal("ReadRes3 ~p", [Reads3]),
121-
% ct:pal("overview3 ~p", [ra:member_overview(KvId2)]),
122118
?assertEqual(3, map_size(Reads3)),
123119

124120
%% TODO: test recovery of kv
@@ -128,4 +124,16 @@ basics(_Config) ->
128124
undefined, 1000),
129125

130126
?assertEqual(3, map_size(Reads4)),
127+
ra:trigger_compaction(KvId),
128+
%% wait for compaction by querying counters
129+
ok = ra_lib:retry(
130+
fun () ->
131+
#{major_compactions := Maj} =
132+
ra_counters:counters(KvId, [major_compactions]),
133+
Maj == 1
134+
end, 10, 100),
135+
{ok, {Reads5, _}} = ra_server_proc:read_entries(KvId, [LastIdx | Live],
136+
undefined, 1000),
137+
?assertEqual(Reads4, Reads5),
138+
ct:pal("counters ~p", [ra_counters:overview(KvId)]),
131139
ok.

test/ra_log_2_SUITE.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ read_one(Config) ->
256256
ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
257257
Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)}),
258258
Log1 = append_n(1, 2, 1, Log0),
259-
% Log1 = ra_log:append({1, 1, <<1:64/integer>>}, Log0),
260259
% ensure the written event is delivered
261260
Log2 = deliver_all_log_events(Log1, 200),
262261
{[_], Log} = ra_log_take(1, 1, Log2),

0 commit comments

Comments
 (0)