99-compile (inline_list_funcs ).
1010
1111-export ([
12- init /5 ,
13- init /7 ,
12+ init /4 ,
13+ init /6 ,
1414 close /1 ,
1515 update_segments /2 ,
1616 schedule_compaction /4 ,
6262% % PUBLIC
6363
6464-spec init (ra_uid (), file :filename (), non_neg_integer (),
65- [segment_ref ()], ra_system : names () ) -> state ().
66- init (UId , Dir , MaxOpen , SegRefs , Names ) ->
67- init (UId , Dir , MaxOpen , random , SegRefs , Names , undefined ).
65+ [segment_ref ()]) -> state ().
66+ init (UId , Dir , MaxOpen , SegRefs ) ->
67+ init (UId , Dir , MaxOpen , random , SegRefs , undefined ).
6868
6969-spec init (ra_uid (), file :filename (), non_neg_integer (),
7070 access_pattern (),
71- [segment_ref ()], ra_system : names (),
71+ [segment_ref ()],
7272 undefined | counters :counters_ref ()) -> state ().
73- init (UId , Dir , MaxOpen , AccessPattern , SegRefs0 , #{}, Counter )
73+ init (UId , Dir , MaxOpen , AccessPattern , SegRefs0 , Counter )
7474 when is_binary (UId ) ->
7575 Cfg = # cfg {uid = UId ,
7676 counter = Counter ,
@@ -146,33 +146,24 @@ update_segments(NewSegmentRefs,
146146-spec schedule_compaction (minor | major , ra :index (),
147147 ra_seq :state (), state ()) ->
148148 [ra_server :effect ()].
149- schedule_compaction (minor , SnapIdx , LiveIndexes , State ) ->
150- case ra_log_segments :segment_refs (State ) of
149+ schedule_compaction (Type , SnapIdx , LiveIndexes ,
150+ #? MODULE {cfg = # cfg {directory = Dir }} = State ) ->
151+ case compactable_segrefs (SnapIdx , State ) of
151152 [] ->
152153 [];
153- [_ | Compactable ] ->
154- % % never compact the current segment
155- % % only take those who have a range lower than the snapshot index as
156- % % we never want to compact more than that
157- SegRefs = lists :takewhile (fun ({_Fn , {_Start , End }}) ->
158- End =< SnapIdx
159- end , lists :reverse (Compactable )),
160- % % TODO: minor compactions should also delete / truncate
154+ SegRefs ->
155+ % % TODO: if LiveIndexes is [] we can just delete all compactable
156+ % % segments
161157 % % segments with completely overwritten indexes
162-
163158 Self = self (),
164159 Fun = fun () ->
165- Delete = lists :foldl (
166- fun ({_Fn , Range } = S , Del ) ->
167- case ra_seq :in_range (Range ,
168- LiveIndexes ) of
169- [] ->
170- [S | Del ];
171- _ ->
172- Del
173- end
174- end , [], SegRefs ),
175- Result = # compaction_result {unreferenced = Delete },
160+ Result = case Type of
161+ minor ->
162+ minor_compaction (SegRefs , LiveIndexes );
163+ major ->
164+ MajConf = #{dir => Dir },
165+ major_compaction (MajConf , SegRefs , LiveIndexes )
166+ end ,
176167 % % need to update the ra_servers list of seg refs _before_
177168 % % the segments can actually be deleted
178169 Self ! {ra_log_event ,
@@ -185,6 +176,125 @@ schedule_compaction(minor, SnapIdx, LiveIndexes, State) ->
185176 end }]
186177 end .
187178
179+ major_compaction (#{dir := Dir }, SegRefs , LiveIndexes ) ->
180+ {Compactable , Delete } =
181+ lists :foldl (fun ({Fn0 , Range } = S ,
182+ {Comps , Del }) ->
183+ case ra_seq :in_range (Range ,
184+ LiveIndexes ) of
185+ [] ->
186+ {Comps , [S | Del ]};
187+ Seq ->
188+ Fn = filename :join (Dir , Fn0 ),
189+ Info = ra_log_segment :info (Fn ),
190+ {[{Info , Seq , S } | Comps ], Del }
191+ end
192+ end , {[], []}, SegRefs ),
193+ % % group compactable
194+ CompactionGroups = compaction_groups (lists :reverse (Compactable ), []),
195+ % ct:pal("CompactionGroups ~p", [CompactionGroups ]),
196+ Compacted0 =
197+ [begin
198+ % % create a new segment with .compacting extension
199+ CompactingFn = filename :join (Dir , change_extension (FstFn0 , " .compacting" )),
200+ % % max_count is the sum of all live indexes for segments in the
201+ % % compaction group
202+ MaxCount = lists :sum ([ra_seq :length (S ) || {_ , S , _ } <- All ]),
203+ % % copy live indexes from all segments in compaction group to
204+ % % the compacting segment
205+ {ok , CompSeg0 } = ra_log_segment :open (CompactingFn ,
206+ #{max_count => MaxCount }),
207+ CompSeg = lists :foldl (
208+ fun ({_ , Live , {F , _ }}, S0 ) ->
209+ {ok , S } = ra_log_segment :copy (S0 , filename :join (Dir , F ),
210+ ra_seq :expand (Live )),
211+ S
212+ end , CompSeg0 , All ),
213+ ok = ra_log_segment :close (CompSeg ),
214+
215+ % % overwrite rename .compacting segment to the original .segment file
216+ CompactedFn = filename :join (Dir , FstFn0 ),
217+ ok = prim_file :rename (CompactingFn , CompactedFn ),
218+
219+ % % perform sym linking of the additional segments in the compaction
220+ % % group
221+ [begin
222+ SymFn = filename :join (Dir , change_extension (Fn , " .link" )),
223+ SegFn = filename :join (Dir , Fn ),
224+ % % make a symlink from the compacted target segment to a new .link
225+ % % where the compacted indexes now can be found
226+ ok = prim_file :make_symlink (CompactedFn , SymFn ),
227+ % % rename to link to replace original segment
228+ ok = prim_file :rename (SymFn , SegFn )
229+ end || {_ , _ , {Fn , _ }} <- Additional ],
230+ % % return the new segref and additional
231+ {ra_log_segment :segref (CompactedFn ), [element (3 , I ) || I <- Additional ]}
232+ end || [{_Info , _ , {FstFn0 , _ }} | Additional ] = All
233+ <- CompactionGroups ],
234+
235+
236+ {Compacted , AddDelete } = lists :unzip (Compacted0 ),
237+
238+ # compaction_result {unreferenced = Delete ,
239+ linked = lists :append (AddDelete ),
240+ compacted = Compacted }.
241+
242+ change_extension (SegFn , NewExt ) ->
243+ ra_lib :to_list (filename :basename (SegFn , " .segment" )) ++ NewExt .
244+
245+ compaction_groups ([], Groups ) ->
246+ lists :reverse (Groups );
247+ compaction_groups (Infos , Groups ) ->
248+ case take_group (Infos , #{max_count => 128 }, []) of
249+ {Group , RemInfos } ->
250+ compaction_groups (RemInfos , [Group | Groups ])
251+ end .
252+
253+ take_group ([], _ , Acc ) ->
254+ {lists :reverse (Acc ), []};
255+ take_group ([{#{num_entries := NumEnts }, Live , {_ , _ }} = E | Rem ] = All ,
256+ #{max_count := Mc }, Acc ) ->
257+ Num = ra_seq :length (Live ),
258+ case Num < NumEnts div 2 of
259+ true ->
260+ case Mc - Num < 0 of
261+ true ->
262+ {lists :reverse (Acc ), All };
263+ false ->
264+ take_group (Rem , #{max_count => Mc - Num }, [E | Acc ])
265+ end ;
266+ % % skip this secment
267+ false when Acc == [] ->
268+ take_group (Rem , #{max_count => Mc }, Acc );
269+ false ->
270+ {lists :reverse (Acc ), Rem }
271+ end .
272+
273+
274+ minor_compaction (SegRefs , LiveIndexes ) ->
275+ Delete = lists :foldl (fun ({_Fn , Range } = S , Del ) ->
276+ case ra_seq :in_range (Range ,
277+ LiveIndexes ) of
278+ [] ->
279+ [S | Del ];
280+ _ ->
281+ Del
282+ end
283+ end , [], SegRefs ),
284+ # compaction_result {unreferenced = Delete }.
285+
286+ compactable_segrefs (SnapIdx , State ) ->
287+ case ra_log_segments :segment_refs (State ) of
288+ [] ->
289+ [];
290+ [_ | Compactable ] ->
291+ % % never compact the current segment
292+ % % only take those who have a range lower than the snapshot index as
293+ % % we never want to compact more than that
294+ lists :takewhile (fun ({_Fn , {_Start , End }}) ->
295+ End =< SnapIdx
296+ end , lists :reverse (Compactable ))
297+ end .
188298
189299- spec handle_compaction_result (# compaction_result {}, state ()) ->
190300 {state (), [ra_server :effect ()]}.
@@ -194,9 +304,11 @@ handle_compaction_result(#compaction_result{unreferenced = Unreferenced,
194304 #? STATE {cfg = # cfg {directory = Dir },
195305 open_segments = Open0 ,
196306 segment_refs = SegRefs0 } = State ) ->
197- SegmentRefs0 = ra_lol :to_list (SegRefs0 ),
198- SegmentRefs = lists :usort (((SegmentRefs0 -- Unreferenced ) -- Linked )
199- ++ Compacted ),
307+ SegRefs = ra_lol :to_list (SegRefs0 ) -- (Linked -- Unreferenced ),
308+ SegmentRefs0 = maps :merge (
309+ maps :from_list (SegRefs ),
310+ maps :from_list (Compacted )),
311+ SegmentRefs = maps :to_list (maps :iterator (SegmentRefs0 , ordered )),
200312 Open = ra_flru :evict_all (Open0 ),
201313 Fun = fun () ->
202314 [prim_file :delete (filename :join (Dir , F ))
0 commit comments