3333 next_index /1 ,
3434 snapshot_state /1 ,
3535 set_snapshot_state /2 ,
36- install_snapshot /3 ,
36+ install_snapshot /4 ,
3737 recover_snapshot /1 ,
3838 snapshot_index_term /1 ,
3939 update_release_cursor /5 ,
110110 reader :: ra_log_reader :state (),
111111 mem_table :: ra_mt :state (),
112112 tx = false :: boolean (),
113- pending = [] :: ra_seq :state ()
113+ pending = [] :: ra_seq :state (),
114+ live_indexes = [] :: ra_seq :state ()
114115 }).
115116
116117-record (read_plan , {dir :: file :filename_all (),
@@ -209,6 +210,13 @@ init(#{uid := UId,
209210 undefined -> {- 1 , - 1 };
210211 Curr -> Curr
211212 end ,
213+ % % TODO: error handling
214+ % % TODO: the "indexes" file isn't authoritative when it comes to live
215+ % % indexes, we need to recover the snapshot and query it for live indexes
216+ % % to get the actual valua
217+ {ok , LiveIndexes } = ra_snapshot :indexes (
218+ ra_snapshot :current_snapshot_dir (SnapshotState )),
219+ ct :pal (" log init live indexes ~p " , [LiveIndexes ]),
212220
213221 AccessPattern = maps :get (initial_access_pattern , Conf , sequential ),
214222 {ok , Mt0 } = ra_log_ets :mem_table_please (Names , UId ),
@@ -253,13 +261,15 @@ init(#{uid := UId,
253261 reader = Reader ,
254262 mem_table = Mt ,
255263 snapshot_state = SnapshotState ,
256- last_wal_write = {whereis (Wal ), now_ms ()}
264+ last_wal_write = {whereis (Wal ), now_ms ()},
265+ live_indexes = LiveIndexes
257266 },
258267 put_counter (Cfg , ? C_RA_SVR_METRIC_SNAPSHOT_INDEX , SnapIdx ),
259268 LastIdx = State0 #? MODULE .last_index ,
260269 put_counter (Cfg , ? C_RA_SVR_METRIC_LAST_INDEX , LastIdx ),
261270 put_counter (Cfg , ? C_RA_SVR_METRIC_LAST_WRITTEN_INDEX , LastIdx ),
262- put_counter (Cfg , ? C_RA_SVR_METRIC_NUM_SEGMENTS , ra_log_reader :segment_ref_count (Reader )),
271+ put_counter (Cfg , ? C_RA_SVR_METRIC_NUM_SEGMENTS ,
272+ ra_log_reader :segment_ref_count (Reader )),
263273 case ra_snapshot :latest_checkpoint (SnapshotState ) of
264274 undefined ->
265275 ok ;
@@ -527,7 +537,9 @@ fold(_From, _To, _Fun, Acc, State) ->
527537 {[log_entry ()], state ()}.
528538sparse_read (Indexes0 , #? MODULE {cfg = Cfg ,
529539 reader = Reader0 ,
540+ first_index = FstIdx ,
530541 last_index = LastIdx ,
542+ live_indexes = LiveIndexes ,
531543 mem_table = Mt } = State ) ->
532544 ok = incr_counter (Cfg , ? C_RA_LOG_READ_OPS , 1 ),
533545 % % indexes need to be sorted high -> low for correct and efficient reading
@@ -543,7 +555,20 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,
543555 end ,
544556
545557 % % drop any indexes that are larger than the last index available
546- Indexes2 = lists :dropwhile (fun (I ) -> I > LastIdx end , Indexes1 ),
558+ % % or smaller than first index and not in live indexes
559+ Indexes2 = lists :filter (fun (I ) ->
560+ I =< LastIdx andalso
561+ (case I < FstIdx of
562+ true ->
563+ % % TODO: this can be slow if
564+ % % live indexes are mostly
565+ % % non-contiguous. Create
566+ % % BST for live index lookups
567+ ra_seq :in (I , LiveIndexes );
568+ false ->
569+ true
570+ end )
571+ end , Indexes1 ),
547572 {Entries0 , MemTblNumRead , Indexes } = ra_mt :get_items (Indexes2 , Mt ),
548573 ok = incr_counter (Cfg , ? C_RA_LOG_READ_MEM_TBL , MemTblNumRead ),
549574 {Entries1 , Reader } = ra_log_reader :sparse_read (Reader0 , Indexes , Entries0 ),
@@ -811,6 +836,30 @@ handle_event({segments, TidRanges, NewSegs},
811836 pending = Pend ,
812837 mem_table = Mt },
813838 {State , []};
839+ handle_event ({segments_to_be_deleted , SegRefs },
840+ #? MODULE {cfg = # cfg {uid = UId ,
841+ log_id = LogId ,
842+ directory = Dir ,
843+ counter = Counter ,
844+ names = Names },
845+ reader = Reader } = State ) ->
846+ Fun = fun () ->
847+ [prim_file :delete (filename :join (Dir , F ))
848+ || {_ , F } <- SegRefs ],
849+ ok
850+ end ,
851+ ActiveSegs = ra_log_reader :segment_refs (Reader ) -- SegRefs ,
852+ #{max_size := MaxOpenSegments } = ra_log_reader :info (Reader ),
853+ % close all open segments
854+ ok = ra_log_reader :close (Reader ),
855+ ? DEBUG (" ~ts : ~b obsolete segments - remaining: ~b " ,
856+ [LogId , length (SegRefs ), length (ActiveSegs )]),
857+ % % open a new segment with the new max open segment value
858+ {State #? MODULE {reader = ra_log_reader :init (UId , Dir , MaxOpenSegments ,
859+ random ,
860+ ActiveSegs , Names , Counter )},
861+
862+ [{bg_work , Fun , fun (_Err ) -> ok end }]};
814863handle_event ({snapshot_written , {SnapIdx , _ } = Snap , LiveIndexes , SnapKind },
815864 #? MODULE {cfg = # cfg {uid = UId ,
816865 names = Names } = Cfg ,
@@ -848,15 +897,11 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
848897 false ->
849898 Snap
850899 end ,
851- % delete any segments outside of first_index
852- % {State, Effects1} = delete_segments (SnapIdx, State0),
900+
901+ CompEffs = schedule_compaction (SnapIdx , State0 ),
853902 State = State0 ,
854903
855904
856- % % TODO schedule compaction,
857- % SegRefs = ra_log_reader:segment_refs(Reader),
858- %
859-
860905 % % remove all pending below smallest live index as the wal
861906 % % may not write them
862907 % % TODO: test that a written even can still be processed if it
@@ -876,12 +921,13 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, LiveIndexes, SnapKind},
876921 {Spec , Mt1 } = ra_mt :set_first (SmallestLiveIdx , Mt0 ),
877922 ok = exec_mem_table_delete (Names , UId , Spec ),
878923
879- Effects = Effects0 , % ++ Effects1,
924+ Effects = CompEffs ++ Effects0 , % ++ Effects1,
880925 {State #? MODULE {first_index = SnapIdx + 1 ,
881926 last_index = max (LstIdx , SnapIdx ),
882927 last_written_index_term = LWIdxTerm ,
883928 mem_table = Mt1 ,
884929 pending = Pend ,
930+ live_indexes = LiveIndexes ,
885931 snapshot_state = SnapState }, Effects };
886932 checkpoint ->
887933 put_counter (Cfg , ? C_RA_SVR_METRIC_CHECKPOINT_INDEX , SnapIdx ),
@@ -957,19 +1003,20 @@ snapshot_state(State) ->
9571003set_snapshot_state (SnapState , State ) ->
9581004 State #? MODULE {snapshot_state = SnapState }.
9591005
960- -spec install_snapshot (ra_idxterm (), ra_snapshot :state (), state ()) ->
961- {state (), effects ()}.
962- install_snapshot ({SnapIdx , SnapTerm } = IdxTerm , SnapState0 ,
1006+ - spec install_snapshot (ra_idxterm (), ra_snapshot :state (), module (), state ()) ->
1007+ {ra_snapshot : meta (), MacState :: term (), state (), effects ()}.
1008+ install_snapshot ({SnapIdx , SnapTerm } = IdxTerm , SnapState0 , MacMod ,
9631009 #? MODULE {cfg = # cfg {uid = UId ,
9641010 names = Names } = Cfg ,
965- mem_table = Mt0 } = State0 ) ->
1011+ mem_table = Mt0 } = State0 )
1012+ when is_atom (MacMod ) ->
9661013 ok = incr_counter (Cfg , ? C_RA_LOG_SNAPSHOTS_INSTALLED , 1 ),
9671014 ok = put_counter (Cfg , ? C_RA_SVR_METRIC_SNAPSHOT_INDEX , SnapIdx ),
9681015 put_counter (Cfg , ? C_RA_SVR_METRIC_LAST_INDEX , SnapIdx ),
9691016 put_counter (Cfg , ? C_RA_SVR_METRIC_LAST_WRITTEN_INDEX , SnapIdx ),
970- % {State, Effs} = delete_segments(SnapIdx, State0),
971- % TODO: schedule compaction
972- State = State0 ,
1017+
1018+ CompEffs = schedule_compaction ( SnapIdx , State0 ),
1019+
9731020 {SnapState , Checkpoints } =
9741021 ra_snapshot :take_older_checkpoints (SnapIdx , SnapState0 ),
9751022 CPEffects = [{delete_snapshot ,
@@ -979,13 +1026,22 @@ install_snapshot({SnapIdx, SnapTerm} = IdxTerm, SnapState0,
9791026 % % below the snap idx
9801027 {Spec , Mt } = ra_mt :set_first (SnapIdx + 1 , Mt0 ),
9811028 ok = exec_mem_table_delete (Names , UId , Spec ),
982- {State #? MODULE {snapshot_state = SnapState ,
983- first_index = SnapIdx + 1 ,
984- last_index = SnapIdx ,
985- last_term = SnapTerm ,
986- mem_table = Mt ,
987- last_written_index_term = IdxTerm },
988- CPEffects }.
1029+ State = State0 #? MODULE {snapshot_state = SnapState ,
1030+ first_index = SnapIdx + 1 ,
1031+ last_index = SnapIdx ,
1032+ last_term = SnapTerm ,
1033+ mem_table = Mt ,
1034+ last_written_index_term = IdxTerm },
1035+ {Meta , MacState } = recover_snapshot (State ),
1036+ LiveIndexes = ra_machine :live_indexes (MacMod , MacState ),
1037+ % % TODO: it is not safe to write the indexes _after_ if we then treat
1038+ % % the persisted indexes as authoritative as if we crash in between
1039+ % % it may compact segments that still contain live indexes
1040+ SnapDir = ra_snapshot :current_snapshot_dir (SnapState ),
1041+ ok = ra_snapshot :write_indexes (SnapDir , LiveIndexes ),
1042+ {Meta , MacState , State #? MODULE {live_indexes = LiveIndexes },
1043+ CompEffs ++ CPEffects }.
1044+
9891045
9901046- spec recover_snapshot (State :: state ()) ->
9911047 option ({ra_snapshot :meta (), term ()}).
@@ -1267,6 +1323,44 @@ release_resources(MaxOpenSegments,
12671323
12681324% %% Local functions
12691325
1326+
1327+ schedule_compaction (SnapIdx , #? MODULE {cfg = # cfg {},
1328+ snapshot_state = SnapState ,
1329+ reader = Reader0 }) ->
1330+ case ra_log_reader :segment_refs (Reader0 ) of
1331+ [] ->
1332+ [];
1333+ [_ | Compactable ] ->
1334+ % % never compact the current segment
1335+ % % only take those who have a range lower than the snapshot index as
1336+ % % we never want to compact more than that
1337+ SegRefs = lists :takewhile (fun ({{_Start , End }, _ }) ->
1338+ End =< SnapIdx
1339+ end , lists :reverse (Compactable )),
1340+ SnapDir = ra_snapshot :current_snapshot_dir (SnapState ),
1341+
1342+ Self = self (),
1343+ Fun = fun () ->
1344+ {ok , Indexes } = ra_snapshot :indexes (SnapDir ),
1345+ {Delete , _ } = lists :foldl (
1346+ fun ({Range , _ } = S , {Del , Keep }) ->
1347+ case ra_seq :in_range (Range , Indexes ) of
1348+ [] ->
1349+ {[S | Del ], Keep };
1350+ _ ->
1351+ {Del , [S | Keep ]}
1352+ end
1353+ end , {[], []}, SegRefs ),
1354+ % % need to update the ra_servers list of seg refs _before_
1355+ % % the segments can actually be deleted
1356+ Self ! {ra_log_event ,
1357+ {segments_to_be_deleted , Delete }},
1358+ ok
1359+ end ,
1360+
1361+ [{bg_work , Fun , fun (_Err ) -> ok end }]
1362+ end .
1363+
12701364% % deletes all segments where the last index is lower than
12711365% % the Idx argument
12721366% delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
@@ -1364,7 +1458,7 @@ resend_pending(#?MODULE{cfg = Cfg,
13641458 last_resend_time = undefined ,
13651459 pending = Pend ,
13661460 mem_table = Mt } = State ) ->
1367- ct : pal (" ~ts : ra_log: resending from ~b to ~b mt ~p " ,
1461+ ? DEBUG (" ~ts : ra_log: resending from ~b to ~b mt ~p " ,
13681462 [State #? MODULE .cfg # cfg .log_id , ra_seq :first (Pend ),
13691463 ra_seq :last (Pend ), ra_mt :range (Mt )]),
13701464 ok = incr_counter (Cfg , ? C_RA_LOG_WRITE_RESENDS , ra_seq :length (Pend )),
0 commit comments