Skip to content

Commit 83d84bc

Browse files
committed
wip
1 parent 3214e42 commit 83d84bc

File tree

3 files changed

+81
-92
lines changed

3 files changed

+81
-92
lines changed

src/ra_log.erl

Lines changed: 11 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@
103103
{cfg = #cfg{},
104104
%% mutable data below
105105
range :: ra:range(),
106-
% first_index = -1 :: ra_index(),
107-
% last_index = -1 :: -1 | ra_index(),
108106
last_term = 0 :: ra_term(),
109107
last_written_index_term = {0, 0} :: ra_idxterm(),
110108
snapshot_state :: ra_snapshot:state(),
@@ -817,30 +815,10 @@ handle_event({segments, TidRanges, NewSegs},
817815
ok
818816
end,
819817
{State, [{bg_work, Fun, fun (_Err) -> ok end}]};
820-
handle_event({segments_to_be_deleted, SegRefs},
821-
#?MODULE{cfg = #cfg{uid = UId,
822-
log_id = LogId,
823-
directory = Dir,
824-
counter = Counter,
825-
names = Names},
826-
reader = Reader} = State) ->
827-
ActiveSegs = ra_log_segments:segment_refs(Reader) -- SegRefs,
828-
#{max_size := MaxOpenSegments} = ra_log_segments:info(Reader),
829-
% close all open segments
830-
ok = ra_log_segments:close(Reader),
831-
?DEBUG("~ts: ~b obsolete segments - remaining: ~b",
832-
[LogId, length(SegRefs), length(ActiveSegs)]),
833-
%% open a new segment with the new max open segment value
834-
Fun = fun () ->
835-
[prim_file:delete(filename:join(Dir, F))
836-
|| {F, _} <- SegRefs],
837-
ok
838-
end,
839-
{State#?MODULE{reader = ra_log_segments:init(UId, Dir, MaxOpenSegments,
840-
random,
841-
ActiveSegs, Names, Counter)},
842-
843-
[{bg_work, Fun, fun (_Err) -> ok end}]};
818+
handle_event({compaction_result, Result},
819+
#?MODULE{reader = Reader0} = State) ->
820+
{Reader, Effs} = ra_log_segments:handle_compaction_result(Result, Reader0),
821+
{State#?MODULE{reader = Reader}, Effs};
844822
handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
845823
#?MODULE{cfg = #cfg{uid = UId,
846824
names = Names} = Cfg,
@@ -904,7 +882,9 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
904882
live_indexes = LiveIndexes,
905883
current_snapshot = Snap,
906884
snapshot_state = SnapState},
907-
CompEffs = schedule_compaction(SnapIdx, State),
885+
CompEffs = ra_log_segments:schedule_compaction(minor, SnapIdx,
886+
LiveIndexes,
887+
State#?MODULE.reader),
908888
Effects = CompEffs ++ Effects0,
909889
{State, Effects};
910890
checkpoint ->
@@ -1021,7 +1001,9 @@ install_snapshot({SnapIdx, SnapTerm} = IdxTerm, MacMod, LiveIndexes,
10211001
live_indexes = LiveIndexes,
10221002
mem_table = Mt,
10231003
last_written_index_term = IdxTerm},
1024-
CompEffs = schedule_compaction(SnapIdx, State),
1004+
CompEffs = ra_log_segments:schedule_compaction(minor, SnapIdx,
1005+
LiveIndexes,
1006+
State#?MODULE.reader),
10251007
{ok, State, CompEffs ++ CPEffects}.
10261008

10271009

@@ -1326,52 +1308,7 @@ release_resources(MaxOpenSegments,
13261308
%%% Local functions
13271309

13281310

1329-
schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{uid = _UId,
1330-
segment_writer = _SegWriter},
1331-
live_indexes = LiveIndexes,
1332-
reader = Reader0}) ->
1333-
case ra_log_segments:segment_refs(Reader0) of
1334-
[] ->
1335-
[];
1336-
[_ | Compactable] ->
1337-
%% never compact the current segment
1338-
%% only take those who have a range lower than the snapshot index as
1339-
%% we never want to compact more than that
1340-
SegRefs = lists:takewhile(fun ({_Fn, {_Start, End}}) ->
1341-
End =< SnapIdx
1342-
end, lists:reverse(Compactable)),
1343-
% SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
1344-
1345-
%% TODO: minor compactions should also delete / truncate
1346-
%% segments with completely overwritten indexes
1347-
1348-
Self = self(),
1349-
Fun =
1350-
fun () ->
1351-
Delete = lists:foldl(
1352-
fun({_Fn, Range} = S, Del) ->
1353-
case ra_seq:in_range(Range,
1354-
LiveIndexes) of
1355-
[] ->
1356-
[S | Del];
1357-
_ ->
1358-
Del
1359-
end
1360-
end, [], SegRefs),
1361-
%% need to update the ra_servers list of seg refs _before_
1362-
%% the segments can actually be deleted
1363-
Self ! {ra_log_event,
1364-
{segments_to_be_deleted, Delete}},
1365-
ok
1366-
end,
1367-
1368-
[{bg_work, Fun, fun (_Err) ->
1369-
?WARN("bgwork err ~p", [_Err]), ok
1370-
end}]
1371-
end.
1372-
1373-
1374-
%% unly used by resend to wal functionality and doesn't update the mem table
1311+
%% only used by resend to wal functionality and doesn't update the mem table
13751312
wal_rewrite(#?MODULE{cfg = #cfg{uid = UId,
13761313
wal = Wal} = Cfg,
13771314
range = _Range} = State,

src/ra_log_segments.erl

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
init/7,
1414
close/1,
1515
update_segments/2,
16-
handle_compaction/2,
16+
schedule_compaction/4,
17+
handle_compaction_result/2,
1718
segment_refs/1,
1819
segment_ref_count/1,
1920
range/1,
@@ -137,24 +138,75 @@ update_segments(NewSegmentRefs,
137138
open_segments = Open},
138139
OverwrittenSegments}.
139140

140-
-record(log_compaction_result,
141-
{%range :: ra:range(),
142-
unreferenced :: [segment_ref()],
143-
linked :: [segment_ref()],
144-
compacted :: [segment_ref()]}).
145-
146-
-spec handle_compaction(#log_compaction_result{}, state()) -> state().
147-
handle_compaction(#log_compaction_result{unreferenced = Deleted,
148-
linked = Linked,
149-
compacted = Compacted},
150-
#?STATE{open_segments = Open0,
151-
segment_refs = SegRefs0} = State) ->
141+
-record(compaction_result,
142+
{unreferenced = [] :: [segment_ref()],
143+
linked = [] :: [segment_ref()],
144+
compacted = [] :: [segment_ref()]}).
145+
146+
-spec schedule_compaction(minor | major, ra:index(),
147+
ra_seq:state(), state()) ->
148+
[ra_server:effect()].
149+
schedule_compaction(minor, SnapIdx, LiveIndexes, State) ->
150+
case ra_log_segments:segment_refs(State) of
151+
[] ->
152+
[];
153+
[_ | Compactable] ->
154+
%% never compact the current segment
155+
%% only take those who have a range lower than the snapshot index as
156+
%% we never want to compact more than that
157+
SegRefs = lists:takewhile(fun ({_Fn, {_Start, End}}) ->
158+
End =< SnapIdx
159+
end, lists:reverse(Compactable)),
160+
%% TODO: minor compactions should also delete / truncate
161+
%% segments with completely overwritten indexes
162+
163+
Self = self(),
164+
Fun = fun () ->
165+
Delete = lists:foldl(
166+
fun({_Fn, Range} = S, Del) ->
167+
case ra_seq:in_range(Range,
168+
LiveIndexes) of
169+
[] ->
170+
[S | Del];
171+
_ ->
172+
Del
173+
end
174+
end, [], SegRefs),
175+
Result = #compaction_result{unreferenced = Delete},
176+
%% need to update the ra_servers list of seg refs _before_
177+
%% the segments can actually be deleted
178+
Self ! {ra_log_event,
179+
{compaction_result, Result}},
180+
ok
181+
end,
182+
183+
[{bg_work, Fun, fun (_Err) ->
184+
?WARN("bgwork err ~p", [_Err]), ok
185+
end}]
186+
end.
187+
188+
189+
-spec handle_compaction_result(#compaction_result{}, state()) ->
190+
{state(), [ra_server:effect()]}.
191+
handle_compaction_result(#compaction_result{unreferenced = Unreferenced,
192+
linked = Linked,
193+
compacted = Compacted},
194+
#?STATE{cfg = #cfg{directory = Dir},
195+
open_segments = Open0,
196+
segment_refs = SegRefs0} = State) ->
152197
SegmentRefs0 = ra_lol:to_list(SegRefs0),
153-
SegmentRefs = lists:usort(((SegmentRefs0 -- Deleted) -- Linked) ++ Compacted),
198+
SegmentRefs = lists:usort(((SegmentRefs0 -- Unreferenced) -- Linked)
199+
++ Compacted),
154200
Open = ra_flru:evict_all(Open0),
155-
State#?MODULE{segment_refs = ra_lol:from_list(fun seg_ref_gt/2,
156-
lists:reverse(SegmentRefs)),
157-
open_segments = Open}.
201+
Fun = fun () ->
202+
[prim_file:delete(filename:join(Dir, F))
203+
|| {F, _} <- Unreferenced],
204+
ok
205+
end,
206+
{State#?MODULE{segment_refs = ra_lol:from_list(fun seg_ref_gt/2,
207+
SegmentRefs),
208+
open_segments = Open},
209+
[{bg_work, Fun, fun (_Err) -> ok end}]}.
158210

159211

160212
-spec update_first_index(ra_index(), state()) ->

src/ra_server.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@
178178
%% used for tracking valid leader messages
179179
{record_leader_msg, ra_server_id()} |
180180
start_election_timeout |
181-
{bg_work, fun(() -> ok) | mfargs()}.
181+
{bg_work, fun(() -> ok) | mfargs(), fun()}.
182182

183183
-type effects() :: [effect()].
184184

0 commit comments

Comments
 (0)