@@ -230,9 +230,17 @@ init(#{uid := UId,
230230 SegRefs = my_segrefs (UId , SegWriter ),
231231 Reader = ra_log_reader :init (UId , Dir , MaxOpen , AccessPattern , SegRefs ,
232232 Names , Counter ),
233- SegRange = ra_log_reader :range (Reader ),
233+ SegmentRange = ra_log_reader :range (Reader ),
234234 % % TODO: check ra_range:add/2 actually performas the correct logic we expect
235- Range = ra_range :add (MtRange , SegRange ),
235+ Range = ra_range :add (MtRange , SegmentRange ),
236+
237+ % % TODO: review this
238+ [begin
239+ ? DEBUG (" ~ts : deleting overwritten segment ~w " ,
240+ [LogId , SR ]),
241+ catch prim_file :delete (filename :join (Dir , F ))
242+ end
243+ || {_ , F } = SR <- SegRefs -- ra_log_reader :segment_refs (Reader )],
236244
237245 % % assert there is no gap between the snapshot
238246 % % and the first index in the log
@@ -298,10 +306,10 @@ init(#{uid := UId,
298306 {_ , LI } ->
299307 fetch_term (LI , State0 )
300308 end ,
301- LastSegRefIdx = case SegRefs of
302- [] ->
309+ LastSegRefIdx = case SegmentRange of
310+ undefined ->
303311 - 1 ;
304- [{{ _ , L }, _ } | _ ] ->
312+ { _ , L } ->
305313 L
306314 end ,
307315 LastWrittenIdx = case ra_log_wal :last_writer_seq (Wal , UId ) of
@@ -311,7 +319,8 @@ init(#{uid := UId,
311319 {ok , Idx } ->
312320 max (Idx , LastSegRefIdx );
313321 {error , wal_down } ->
314- ? ERROR (" ~ts : ra_log:init/1 cannot complete as wal process is down." ,
322+ ? ERROR (" ~ts : ra_log:init/1 cannot complete as wal"
323+ " process is down." ,
315324 [State2 #? MODULE .cfg # cfg .log_id ]),
316325 exit (wal_down )
317326 end ,
@@ -332,9 +341,7 @@ init(#{uid := UId,
332341 State = maybe_append_first_entry (State4 ),
333342 ? DEBUG (" ~ts : ra_log:init recovered last_index_term ~w "
334343 " snapshot_index_term ~w , last_written_index_term ~w " ,
335- [State #? MODULE .cfg # cfg .log_id ,
336- last_index_term (State ),
337- {SnapIdx , SnapTerm },
344+ [LogId , last_index_term (State ), {SnapIdx , SnapTerm },
338345 State #? MODULE .last_written_index_term
339346 ]),
340347 assert (State ).
@@ -767,11 +774,15 @@ handle_event({written, Term, WrittenSeq},
767774 end
768775 end ;
769776handle_event ({segments , TidRanges , NewSegs },
770- #? MODULE {cfg = # cfg {uid = UId , names = Names } = Cfg ,
777+ #? MODULE {cfg = # cfg {uid = UId ,
778+ log_id = LogId ,
779+ directory = Dir ,
780+ names = Names } = Cfg ,
771781 reader = Reader0 ,
772782 pending = Pend0 ,
773783 mem_table = Mt0 } = State0 ) ->
774- Reader = ra_log_reader :update_segments (NewSegs , Reader0 ),
784+ {Reader , OverwrittenSegRefs } = ra_log_reader :update_segments (NewSegs , Reader0 ),
785+
775786 put_counter (Cfg , ? C_RA_SVR_METRIC_NUM_SEGMENTS ,
776787 ra_log_reader :segment_ref_count (Reader )),
777788 % % the tid ranges arrive in the reverse order they were written
@@ -782,6 +793,7 @@ handle_event({segments, TidRanges, NewSegs},
782793 ok = ra_log_ets :execute_delete (Names , UId , Spec ),
783794 Acc
784795 end , Mt0 , TidRanges ),
796+
785797 % % it is theoretically possible that the segment writer flush _could_
786798 % % over take WAL notifications
787799 % %
@@ -795,26 +807,35 @@ handle_event({segments, TidRanges, NewSegs},
795807 State = State0 #? MODULE {reader = Reader ,
796808 pending = Pend ,
797809 mem_table = Mt },
798- {State , []};
810+ Fun = fun () ->
811+ [begin
812+ ? DEBUG (" ~ts : deleting overwritten segment ~w " ,
813+ [LogId , SR ]),
814+ catch prim_file :delete (filename :join (Dir , F ))
815+ end
816+ || {_ , F } = SR <- OverwrittenSegRefs ],
817+ ok
818+ end ,
819+ {State , [{bg_work , Fun , fun (_Err ) -> ok end }]};
799820handle_event ({segments_to_be_deleted , SegRefs },
800821 #? MODULE {cfg = # cfg {uid = UId ,
801822 log_id = LogId ,
802823 directory = Dir ,
803824 counter = Counter ,
804825 names = Names },
805826 reader = Reader } = State ) ->
806- Fun = fun () ->
807- [prim_file :delete (filename :join (Dir , F ))
808- || {_ , F } <- SegRefs ],
809- ok
810- end ,
811827 ActiveSegs = ra_log_reader :segment_refs (Reader ) -- SegRefs ,
812828 #{max_size := MaxOpenSegments } = ra_log_reader :info (Reader ),
813829 % close all open segments
814830 ok = ra_log_reader :close (Reader ),
815831 ? DEBUG (" ~ts : ~b obsolete segments - remaining: ~b " ,
816832 [LogId , length (SegRefs ), length (ActiveSegs )]),
817833 % % open a new segment with the new max open segment value
834+ Fun = fun () ->
835+ [prim_file :delete (filename :join (Dir , F ))
836+ || {_ , F } <- SegRefs ],
837+ ok
838+ end ,
818839 {State #? MODULE {reader = ra_log_reader :init (UId , Dir , MaxOpenSegments ,
819840 random ,
820841 ActiveSegs , Names , Counter )},
@@ -824,8 +845,6 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
824845 #? MODULE {cfg = # cfg {uid = UId ,
825846 names = Names } = Cfg ,
826847 range = {FstIdx , _ } = Range ,
827- % first_index = FstIdx,
828- % last_index = LstIdx,
829848 mem_table = Mt0 ,
830849 pending = Pend0 ,
831850 last_written_index_term = {LastWrittenIdx , _ } = LWIdxTerm0 ,
@@ -1307,8 +1326,9 @@ release_resources(MaxOpenSegments,
13071326% %% Local functions
13081327
13091328
1310- schedule_compaction (SnapIdx , #? MODULE {cfg = # cfg {},
1311- snapshot_state = SnapState ,
1329+ schedule_compaction (SnapIdx , #? MODULE {cfg = # cfg {uid = _UId ,
1330+ segment_writer = _SegWriter },
1331+ live_indexes = LiveIndexes ,
13121332 reader = Reader0 }) ->
13131333 case ra_log_reader :segment_refs (Reader0 ) of
13141334 [] ->
@@ -1320,29 +1340,34 @@ schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{},
13201340 SegRefs = lists :takewhile (fun ({{_Start , End }, _ }) ->
13211341 End =< SnapIdx
13221342 end , lists :reverse (Compactable )),
1323- SnapDir = ra_snapshot :current_snapshot_dir (SnapState ),
1343+ % SnapDir = ra_snapshot:current_snapshot_dir(SnapState),
13241344
13251345 % % TODO: minor compactions should also delete / truncate
13261346 % % segments with completely overwritten indexes
13271347
13281348 Self = self (),
1329- Fun = fun () ->
1330- {ok , Indexes } = ra_snapshot :indexes (SnapDir ),
1331- {Delete , _ } = lists :foldl (
1332- fun ({Range , _ } = S , {Del , Keep }) ->
1333- case ra_seq :in_range (Range , Indexes ) of
1334- [] ->
1335- {[S | Del ], Keep };
1336- _ ->
1337- {Del , [S | Keep ]}
1338- end
1339- end , {[], []}, SegRefs ),
1340- % % need to update the ra_servers list of seg refs _before_
1341- % % the segments can actually be deleted
1342- Self ! {ra_log_event ,
1343- {segments_to_be_deleted , Delete }},
1344- ok
1345- end ,
1349+ Fun =
1350+ fun () ->
1351+ % {ok, Indexes} = ra_snapshot:indexes(SnapDir),
1352+
1353+ % % get all current segrefs
1354+ % AllSegRefs = my_segrefs(UId, SegWriter),
1355+ Delete = lists :foldl (
1356+ fun ({Range , _ } = S , Del ) ->
1357+ case ra_seq :in_range (Range ,
1358+ LiveIndexes ) of
1359+ [] ->
1360+ [S | Del ];
1361+ _ ->
1362+ Del
1363+ end
1364+ end , [], SegRefs ),
1365+ % % need to update the ra_servers list of seg refs _before_
1366+ % % the segments can actually be deleted
1367+ Self ! {ra_log_event ,
1368+ {segments_to_be_deleted , Delete }},
1369+ ok
1370+ end ,
13461371
13471372 [{bg_work , Fun , fun (_Err ) ->
13481373 ? WARN (" bgwork err ~p " , [_Err ]), ok
0 commit comments