@@ -573,6 +573,14 @@ decr_counter(#cfg{counter = undefined}, _, _) ->
573573segment_files (Dir , Fun ) ->
574574 list_files (Dir , " .segment" , Fun ).
575575
576+ % delete_files(Dir, Ext) ->
577+ % case list_files(Dir, Ext, fun (_) -> true end) of
578+ % [] ->
579+ % ok;
580+ % Files ->
581+ % [prim_file:delete(filename:join(Dir, F)) || F <- Files]
582+ % end.
583+
576584list_files (Dir , Ext ) ->
577585 list_files (Dir , Ext , fun (_ ) -> true end ).
578586
@@ -622,10 +630,13 @@ major_compaction(#{dir := Dir} = CompConf, SegRefs, LiveIndexes) ->
622630 CompConf ),
623631 Compacted0 =
624632 [begin
633+ % % create a compaction marker with the compaction group i
634+ CompactionMarker = filename :join (Dir , with_ext (CompGroupLeaderFn ,
635+ " .compaction_group" )),
636+ ok = ra_lib :write_file (CompactionMarker , term_to_binary (All )),
625637 % % create a new segment with .compacting extension
626- AllShortFns = [F || {_ , _ , {F , _ }} <- All ],
627- CompactingShortFn = make_compacting_file_name (AllShortFns ),
628- CompactingFn = filename :join (Dir , CompactingShortFn ),
638+ CompactingFn = filename :join (Dir , with_ext (CompGroupLeaderFn ,
639+ " .compacting" )),
629640 % % max_count is the sum of all live indexes for segments in the
630641 % % compaction group
631642 MaxCount = lists :sum ([ra_seq :length (S ) || {_ , S , _ } <- All ]),
@@ -641,28 +652,24 @@ major_compaction(#{dir := Dir} = CompConf, SegRefs, LiveIndexes) ->
641652 end , CompSeg0 , All ),
642653 ok = ra_log_segment :close (CompSeg ),
643654
644- % % link .compacting segment to the original .segment file
645- % % first we have to create a hard link to a new .compacted file
646- % % from the .compacting file (as we need to keep this as a marker
647- % % until the end
648- % % then we can rename this on top of the first segment file in the
649- % % group (the target)
650- FirstSegmentFn = filename :join (Dir , FstFn0 ),
651- CompactedFn = filename :join (Dir , with_ext (FstFn0 , " .compacted" )),
652- ok = prim_file :make_link (CompactingFn , CompactedFn ),
653- ok = prim_file :rename (CompactedFn , FirstSegmentFn ),
655+ FirstSegmentFn = filename :join (Dir , CompGroupLeaderFn ),
654656
655657 % % perform sym linking of the additional segments in the compaction
656- % % group
657- ok = make_links (Dir , FirstSegmentFn ,
658- [F || {_ , _ , {F , _ }} <- Additional ]),
659- % % finally deleted the .compacting file to signal compaction group
660- % % is complete
661- ok = prim_file :delete (CompactingFn ),
658+ % % group, the target is not yet updated which can be detected at
659+ % % recovery by the presence of a sym link _and_ the .compacting
660+ % % file
661+ ok = make_symlinks (Dir , FirstSegmentFn ,
662+ [F || {_ , _ , {F , _ }} <- Additional ]),
663+
664+ % % rename the .compacting segment on top of the group leader
665+ ok = prim_file :rename (CompactingFn , FirstSegmentFn ),
666+ % % finally delete the .compaction_marker file to signal
667+ % % compaction group is complete
668+ ok = prim_file :delete (CompactionMarker ),
662669 % % return the new segref and additional segment keys
663670 {ra_log_segment :segref (FirstSegmentFn ),
664671 [A || {_ , _ , {A , _ }} <- Additional ]}
665- end || [{_Info , _ , {FstFn0 , _ }} | Additional ] = All
672+ end || [{_Info , _ , {CompGroupLeaderFn , _ }} | Additional ] = All
666673 <- CompactionGroups ],
667674
668675 {Compacted , AddDelete } = lists :unzip (Compacted0 ),
@@ -705,7 +712,7 @@ compactable_segrefs(SnapIdx, State) ->
705712 end , [], Compactable )
706713 end .
707714
708- make_links (Dir , To , From )
715+ make_symlinks (Dir , To , From )
709716 when is_list (From ) ->
710717 [begin
711718 SymFn = filename :join (Dir , with_ext (FromFn , " .link" )),
@@ -769,55 +776,77 @@ take_group([{#{num_entries := NumEnts,
769776 {lists :reverse (Acc ), Rem }
770777 end .
771778
772-
773- parse_compacting_filename (Fn ) when is_binary (Fn ) ->
774- binary :split (filename :rootname (Fn ), <<" -" >>, [global ]).
775-
776- make_compacting_file_name ([N1 | Names ]) ->
777- Root = lists :foldl (fun (N , Acc ) ->
778- [filename :rootname (N ), <<" -" >> | Acc ]
779- end , [N1 ], Names ),
780- iolist_to_binary (lists :reverse ([<<" .compacting" >> | Root ])).
781-
782779recover_compaction (Dir ) ->
783- case list_files (Dir , " .compacting " ) of
780+ case list_files (Dir , " .compaction_group " ) of
784781 [] ->
785782 % % no pending compactions
786783 # compaction_result {};
787- [ShortFn ] ->
788- % % compaction recovery is needed
789- CompactingFn = filename :join (Dir , ShortFn ),
790- {ok , # file_info {links = Links }} =
791- file :read_link_info (CompactingFn , [raw , {time , posix }]),
792- case Links of
793- 1 ->
794- % % must have exited before the target file was renamed
795- % % just delete
796- ok = prim_file :delete (CompactingFn ),
784+ [CompactionGroupFn0 ] ->
785+ % % compaction recovery is needed as there is a .compaction_group file
786+ CompactionGroupFn = filename :join (Dir , CompactionGroupFn0 ),
787+ % % if corrupt, just delete .compaction_group file
788+ {ok , Bin } = prim_file :read_file (CompactionGroupFn ),
789+ CompactionGroup = try binary_to_term (Bin ) of
790+ Group ->
791+ Group
792+ catch _ :_ ->
793+ % % any error just return empty
794+ _ = prim_file :delete (CompactionGroupFn ),
795+ []
796+ end ,
797+
798+ % % there _may_ be a .compacting file
799+ CompactingFn = filename :join (Dir , with_ext (CompactionGroupFn0 ,
800+ " .compacting" )),
801+
802+ case CompactionGroup of
803+ [] ->
804+ # compaction_result {};
805+ [_ ] ->
806+ % % single segment compaction, we cannot know if the
807+ % % compaction into the compacting segment completed or
808+ % % not
809+ % % ignore return value as CompactingFn may not exist
810+ _ = prim_file :delete (CompactingFn ),
811+ ok = prim_file :delete (CompactionGroupFn ),
797812 # compaction_result {};
798- 2 ->
799- [FstFn | RemFns ] = parse_compacting_filename (ShortFn ),
800- % % there may be a .compacted file
801- Target = filename :join (Dir , with_ext (FstFn , " .segment" )),
802- case list_files (Dir , " .compacted" ) of
803- [CompactedShortFn ] ->
804- CompactedFn = filename :join (Dir , CompactedShortFn ),
805- % % all entries were copied but it failed before
806- % % this hard link could be renamed over the target
807- ok = prim_file :rename (CompactedFn , Target ),
808- ok ;
809- [] ->
810- % % links may not have been fully created,
811- % % delete all .link files then relink
812- ok
813- end ,
814- ok = make_links (Dir , Target , RemFns ),
815- ok = prim_file :delete (CompactingFn ),
816-
817- Linked = [with_ext (L , " .segment" ) || L <- RemFns ],
818- Compacted = [ra_log_segment :segref (Target )],
819- # compaction_result {compacted = Compacted ,
820- linked = Linked }
813+ [TargetShortFn | [FstLinkSeg | _ ] = LinkTargets ] ->
814+ % % multiple segments in group,
815+ % % if any of the additional segments is a symlink
816+ % % the writes to the .compacting segment completed and we
817+ % % can complete the compaction work
818+ FstLinkSegFn = filename :join (Dir , FstLinkSeg ),
819+ FstLinkSegLinkFn = filename :join (Dir , with_ext (FstLinkSeg , " .link" )),
820+ Target = filename :join (Dir , TargetShortFn ),
821+ AtLeastOneLink = ra_lib :is_any_file (FstLinkSegLinkFn ),
822+ CompactingExists = ra_lib :is_any_file (CompactingFn ),
823+ case file :read_link_info (FstLinkSegFn , [raw ]) of
824+ {ok , # file_info {type = Type }}
825+ when Type == symlink orelse
826+ AtLeastOneLink ->
827+ % % it is a symlink, recreate all symlinks and delete
828+ % % compaction marker
829+ ok = make_symlinks (Dir , Target , LinkTargets ),
830+ % % if compacting file exists, rename it to target
831+ if CompactingExists ->
832+ ok = prim_file :rename (CompactingFn , Target );
833+ true ->
834+ ok
835+ end ,
836+ ok = prim_file :delete (CompactionGroupFn ),
837+ Compacted = [ra_log_segment :segref (Target )],
838+ # compaction_result {compacted = Compacted ,
839+ linked = LinkTargets };
840+ {error , enoent } ->
841+ % % segment does not exist indicates what exactly?
842+ _ = prim_file :delete (CompactingFn ),
843+ ok = prim_file :delete (CompactionGroupFn ),
844+ # compaction_result {};
845+ {ok , # file_info {type = regular }} ->
846+ _ = prim_file :delete (CompactingFn ),
847+ ok = prim_file :delete (CompactionGroupFn ),
848+ # compaction_result {}
849+ end
821850 end
822851 end .
823852
0 commit comments