1111
1212-include_lib (" kernel/include/file.hrl" ).
1313-export ([
14- init /7 ,
14+ init /8 ,
1515 close /1 ,
1616 update_segments /2 ,
1717 schedule_compaction /4 ,
2727 exec_read_plan /6 ,
2828 fetch_term /2 ,
2929 info /1 ,
30- purge_symlinks /2
30+ purge_symlinks /2 ,
31+ compaction_conf /1
3132 ]).
3233
3334-include (" ra.hrl" ).
3637
3738-define (SYMLINK_KEEPFOR_S , 60 ).
3839
40+ -type compaction_conf () :: #{max_count => non_neg_integer (),
41+ max_size => non_neg_integer ()}.
3942-type access_pattern () :: sequential | random .
4043% % holds static or rarely changing fields
4144-record (cfg , {uid :: ra_uid (),
4245 log_id = " " :: unicode :chardata (),
4346 counter :: undefined | counters :counters_ref (),
4447 directory :: file :filename (),
45- access_pattern = random :: access_pattern ()
48+ access_pattern = random :: access_pattern (),
49+ compaction_conf :: compaction_conf ()
4650 }).
4751
4852-type segment_ref () :: ra_log :segment_ref ().
7478-spec init (ra_uid (), file :filename_all (), non_neg_integer (),
7579 access_pattern (), [segment_ref ()],
7680 undefined | counters :counters_ref (),
81+ map (),
7782 unicode :chardata ()) -> state ().
78- init (UId , Dir , MaxOpen , AccessPattern , SegRefs0 , Counter , LogId )
83+ init (UId , Dir , MaxOpen , AccessPattern , SegRefs0 , Counter , CompConf , LogId )
7984 when is_binary (UId ) ->
8085 Cfg = # cfg {uid = UId ,
8186 log_id = LogId ,
8287 counter = Counter ,
8388 directory = Dir ,
84- access_pattern = AccessPattern },
89+ access_pattern = AccessPattern ,
90+ compaction_conf = CompConf },
8591 FlruHandler = fun ({_ , Seg }) ->
8692 _ = ra_log_segment :close (Seg ),
8793 decr_counter (Cfg , ? C_RA_LOG_OPEN_SEGMENTS , 1 )
@@ -150,6 +156,7 @@ update_segments(NewSegmentRefs, #?STATE{open_segments = Open0,
150156 [ra_server :effect ()].
151157schedule_compaction (Type , SnapIdx , LiveIndexes ,
152158 #? MODULE {cfg = # cfg {log_id = LogId ,
159+ compaction_conf = CompConf ,
153160 directory = Dir } = Cfg } = State ) ->
154161 case compactable_segrefs (SnapIdx , State ) of
155162 [] ->
@@ -171,21 +178,25 @@ schedule_compaction(Type, SnapIdx, LiveIndexes,
171178 Self = self (),
172179 Fun = fun () ->
173180 ok = incr_counter (Cfg , ? C_RA_LOG_COMPACTIONS_MAJOR_COUNT , 1 ),
174- MajConf = #{dir => Dir },
181+ MajConf = CompConf #{dir => Dir },
175182 Result = major_compaction (MajConf , SegRefs ,
176183 LiveIndexes ),
177- % % TODO: this could be done on a timer if more
178- % % timely symlink cleanup is needed
179- purge_symlinks (Dir , ? SYMLINK_KEEPFOR_S ),
180184 % % need to update the ra_servers list of seg refs
181185 % % _before_ the segments can actually be deleted
182186 Self ! {ra_log_event ,
183187 {compaction_result , Result }},
188+ % % TODO: this could be done on a timer if more
189+ % % timely symlink cleanup is needed
190+ purge_symlinks (Dir , ? SYMLINK_KEEPFOR_S ),
184191 ok
185192 end ,
186193
187194 [{bg_work , Fun ,
188195 fun (Err ) ->
196+ % % send an empty compaction result to ensure the
197+ % % a future compaction can be performed (TODO:)
198+ Self ! {ra_log_event ,
199+ {compaction_result , # compaction_result {}}},
189200 ? WARN (" ~ts : Major compaction failed with ~p " ,
190201 [LogId , Err ]), ok
191202 end }]
@@ -266,6 +277,10 @@ segment_ref_count(#?STATE{segment_refs = SegmentRefs}) ->
266277range (#? STATE {range = Range }) ->
267278 Range .
268279
280+ - spec compaction_conf (state ()) -> map ().
281+ compaction_conf (#? STATE {cfg = # cfg {compaction_conf = Conf }}) ->
282+ Conf .
283+
269284- spec num_open_segments (state ()) -> non_neg_integer ().
270285num_open_segments (#? STATE {open_segments = Open }) ->
271286 ra_flru :size (Open ).
@@ -572,7 +587,7 @@ list_files(Dir, Ext, Fun) ->
572587 []
573588 end .
574589
575- major_compaction (#{dir := Dir }, SegRefs , LiveIndexes ) ->
590+ major_compaction (#{dir := Dir } = CompConf , SegRefs , LiveIndexes ) ->
576591 {Compactable , Delete } =
577592 lists :foldl (fun ({Fn0 , Range } = S ,
578593 {Comps , Del }) ->
@@ -584,7 +599,7 @@ major_compaction(#{dir := Dir}, SegRefs, LiveIndexes) ->
584599 % % get the info map from each
585600 % % potential segment
586601 Fn = filename :join (Dir , Fn0 ),
587- Info = ra_log_segment :info (Fn ),
602+ Info = ra_log_segment :info (Fn , Seq ),
588603 {[{Info , Seq , S } | Comps ], Del }
589604 end
590605 end , {[], []}, SegRefs ),
@@ -602,7 +617,8 @@ major_compaction(#{dir := Dir}, SegRefs, LiveIndexes) ->
602617 ok = prim_file :delete (filename :join (Dir , F ))
603618 end || F <- UnusedFiles ],
604619 % % group compactable
605- CompactionGroups = compaction_groups (lists :reverse (Compactable ), []),
620+ CompactionGroups = compaction_groups (lists :reverse (Compactable ), [],
621+ CompConf ),
606622 Compacted0 =
607623 [begin
608624 % % create a new segment with .compacting extension
@@ -706,31 +722,37 @@ make_links(Dir, To, From)
706722with_ext (Fn , Ext ) when is_binary (Fn ) andalso is_list (Ext ) ->
707723 <<(filename :rootname (Fn ))/binary , (ra_lib :to_binary (Ext ))/binary >>.
708724
709- compaction_groups ([], Groups ) ->
725+ compaction_groups ([], Groups , _Conf ) ->
710726 lists :reverse (Groups );
711- compaction_groups (Infos , Groups ) ->
712- case take_group (Infos , #{ max_count => 128 } , []) of
727+ compaction_groups (Infos , Groups , Conf ) ->
728+ case take_group (Infos , Conf , []) of
713729 {Group , RemInfos } ->
714- compaction_groups (RemInfos , [Group | Groups ])
730+ compaction_groups (RemInfos , [Group | Groups ], Conf )
715731 end .
716732
717733% % TODO: try to take potential size into account
718734take_group ([], _ , Acc ) ->
719735 {lists :reverse (Acc ), []};
720- take_group ([{#{num_entries := NumEnts }, Live , {_ , _ }} = E | Rem ] = All ,
721- #{max_count := Mc }, Acc ) ->
736+ take_group ([{#{num_entries := NumEnts ,
737+ live_size := LiveSize }, Live , {_ , _ }} = E | Rem ] = All ,
738+ #{max_count := Mc ,
739+ max_size := MaxSz }, Acc ) ->
722740 Num = ra_seq :length (Live ),
723- case Num < NumEnts div 2 of
741+ case Num / NumEnts < 0.5 of
724742 true ->
725- case Mc - Num < 0 of
743+ case Mc - Num < 0 orelse
744+ MaxSz - LiveSize < 0 of
726745 true ->
727746 {lists :reverse (Acc ), All };
728747 false ->
729- take_group (Rem , #{max_count => Mc - Num }, [E | Acc ])
748+ take_group (Rem , #{max_count => Mc - Num ,
749+ max_size => MaxSz - LiveSize },
750+ [E | Acc ])
730751 end ;
731752 % % skip this secment
732753 false when Acc == [] ->
733- take_group (Rem , #{max_count => Mc }, Acc );
754+ take_group (Rem , #{max_count => Mc ,
755+ max_size => MaxSz }, Acc );
734756 false ->
735757 {lists :reverse (Acc ), Rem }
736758 end .
0 commit comments