@@ -15,26 +15,15 @@ module Database.LSMTree.Internal.MergeSchedule (
1515 , Levels
1616 , Level (.. )
1717 , IncomingRun (.. )
18- , MergingRun (.. )
19- , newMergingRun
20- , NumRuns (.. )
21- , UnspentCreditsVar (.. )
22- , MergingRunState (.. )
23- , TotalStepsVar (.. )
24- , SpentCreditsVar (.. )
25- , MergeKnownCompleted (.. )
2618 -- * Flushes and scheduled merges
2719 , updatesWithInterleavedFlushes
2820 , flushWriteBuffer
2921 -- * Exported for cabal-docspec
3022 , MergePolicyForLevel (.. )
3123 , maxRunSize
3224 -- * Credits
33- , Credit (.. )
25+ , Credits (.. )
3426 , supplyCredits
35- , ScaledCredits (.. )
36- , supplyMergeCredits
37- , CreditThreshold (.. )
3827 , creditThresholdForLevel
3928 ) where
4029
@@ -58,7 +47,9 @@ import Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
5847import Database.LSMTree.Internal.Index.Compact (IndexCompact )
5948import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue )
6049import qualified Database.LSMTree.Internal.Merge as Merge
61- import Database.LSMTree.Internal.MergingRun
50+ import Database.LSMTree.Internal.MergingRun (MergePolicyForLevel (.. ),
51+ MergingRun , NumRuns (.. ))
52+ import qualified Database.LSMTree.Internal.MergingRun as MR
6253import Database.LSMTree.Internal.Paths (RunFsPaths (.. ),
6354 SessionRoot (.. ))
6455import qualified Database.LSMTree.Internal.Paths as Paths
@@ -190,7 +181,7 @@ mkLevelsCache ::
190181mkLevelsCache reg lvls = do
191182 rs <- foldRunAndMergeM
192183 (fmap V. singleton . dupRun)
193- (duplicateMergingRunRuns reg)
184+ (MR. duplicateRuns reg)
194185 lvls
195186 pure $! LevelsCache_ {
196187 cachedRuns = rs
@@ -416,7 +407,7 @@ updatesWithInterleavedFlushes tr conf resolve hfs hbio root uc es reg tc = do
416407 -- number of supplied credits is based on the size increase of the write
417408 -- buffer, not the the number of processed entries @length es' - length es@.
418409 let numAdded = unNumEntries (WB. numEntries wb') - unNumEntries (WB. numEntries wb)
419- supplyCredits conf (Credit numAdded) (tableLevels tc)
410+ supplyCredits conf (Credits numAdded) (tableLevels tc)
420411 let tc' = tc { tableWriteBuffer = wb' }
421412 if WB. numEntries wb' < maxn then do
422413 pure $! tc'
@@ -662,7 +653,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
662653 ir <- newMerge policyForLevel Merge. LastLevel ln rs
663654 return $! V. singleton $ Level ir V. empty
664655 go ! ln rs' (V. uncons -> Just (Level ir rs, ls)) = do
665- r <- expectCompletedMergeTraced ln ir
656+ r <- expectCompletedMerge ln ir
666657 case mergePolicyForLevel confMergePolicy ln ls of
667658 -- If r is still too small for this level then keep it and merge again
668659 -- with the incoming runs.
@@ -703,10 +694,11 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
703694 ir' <- newMerge LevelLevelling Merge. LastLevel ln (rs' `V.snoc` r)
704695 pure $! Level ir' V. empty `V.cons` V. empty
705696
706- expectCompletedMergeTraced :: LevelNo -> IncomingRun m h
707- -> m (Ref (Run m h ))
708- expectCompletedMergeTraced ln ir = do
709- r <- expectCompletedMerge reg ir
697+ expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h ))
698+ expectCompletedMerge ln ir = do
699+ r <- case ir of
700+ Single r -> pure r
701+ Merging mr -> MR. expectCompleted reg mr
710702 traceWith tr $ AtLevel ln $
711703 TraceExpectCompletedMerge (Run. runFsPathsNumber r)
712704 pure r
@@ -743,7 +735,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
743735 TraceCompletedMerge (Run. size r)
744736 (Run. runFsPathsNumber r)
745737 V. mapM_ (freeTemp reg . releaseRef) rs
746- Merging <$!> newMergingRun mergePolicy numInputRuns numInputEntries MergeKnownCompleted (CompletedMerge r)
738+ Merging <$!> MR. unsafeNew mergePolicy numInputRuns numInputEntries MR. MergeKnownCompleted (MR. CompletedMerge r)
747739
748740 Incremental -> do
749741 mergeMaybe <- allocateMaybeTemp reg
@@ -752,8 +744,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
752744 case mergeMaybe of
753745 Nothing -> error " newMerge: merges can not be empty"
754746 Just m -> do
755- spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
756- Merging <$!> newMergingRun mergePolicy numInputRuns numInputEntries MergeMaybeCompleted (OngoingMerge rs spentCreditsVar m)
747+ spentCreditsVar <- MR. SpentCreditsVar <$> newPrimVar 0
748+ Merging <$!> MR. unsafeNew mergePolicy numInputRuns numInputEntries MR. MergeMaybeCompleted (MR. OngoingMerge rs spentCreditsVar m)
757749
758750-- $setup
759751-- >>> import Database.LSMTree.Internal.Entry
@@ -868,9 +860,12 @@ mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do
868860 can contribute to the same merge concurrently.
869861-}
870862
863+ -- | Merge credits that get supplied to a table's levels.
864+ newtype Credits = Credits Int
865+
871866{-# SPECIALISE supplyCredits ::
872867 TableConfig
873- -> Credit
868+ -> Credits
874869 -> Levels IO h
875870 -> IO ()
876871 #-}
@@ -879,31 +874,33 @@ mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do
879874supplyCredits ::
880875 (MonadSTM m , MonadST m , MonadMVar m , MonadMask m )
881876 => TableConfig
882- -> Credit
877+ -> Credits
883878 -> Levels m h
884879 -> m ()
885880supplyCredits conf c levels =
886881 iforLevelM_ levels $ \ ln (Level ir _rs) ->
887- let ! c' = scaleCreditsForMerge ir c in
888- let ! creditsThresh = creditThresholdForLevel conf ln in
889- supplyMergeCredits c' creditsThresh ir
882+ case ir of
883+ Single {} -> pure ()
884+ Merging mr -> do
885+ let ! c' = scaleCreditsForMerge mr c
886+ let ! thresh = creditThresholdForLevel conf ln
887+ MR. supplyCredits c' thresh mr
890888
891889-- | Scale a number of credits to a number of merge steps to be performed, based
892890-- on the merging run.
893891--
894892-- Initially, 1 update supplies 1 credit. However, since merging runs have
895893-- different numbers of input runs/entries, we may have to a more or less
896894-- merging work than 1 merge step for each credit.
897- scaleCreditsForMerge :: IncomingRun m h -> Credit -> ScaledCredits
895+ scaleCreditsForMerge :: Ref ( MergingRun m h ) -> Credits -> MR. Credits
898896-- A single run is a trivially completed merge, so it requires no credits.
899- scaleCreditsForMerge (Single _) _ = ScaledCredits 0
900- scaleCreditsForMerge (Merging (DeRef MergingRun {.. })) (Credit c) =
901- case mergePolicy of
897+ scaleCreditsForMerge (DeRef mr) (Credits c) =
898+ case MR. mergePolicy mr of
902899 LevelTiering ->
903900 -- A tiering merge has 5 runs at most (one could be held back to merged
904901 -- again) and must be completed before the level is full (once 4 more
905902 -- runs come in).
906- ScaledCredits (c * (1 + 4 ))
903+ MR. Credits (c * (1 + 4 ))
907904 LevelLevelling ->
908905 -- A levelling merge has 1 input run and one resident run, which is (up
909906 -- to) 4x bigger than the others. It needs to be completed before
@@ -914,33 +911,13 @@ scaleCreditsForMerge (Merging (DeRef MergingRun {..})) (Credit c) =
914911 -- worst-case upper bound by looking at the sizes of the input runs.
915912 -- As as result, merge work would/could be more evenly distributed over
916913 -- time when the resident run is smaller than the worst case.
917- let NumRuns n = mergeNumRuns
914+ let NumRuns n = MR. mergeNumRuns mr
918915 -- same as division rounding up: ceiling (c * n / 4)
919- in ScaledCredits ((c * n + 3 ) `div` 4 )
920-
921- {-# SPECIALISE supplyMergeCredits :: ScaledCredits -> CreditThreshold -> IncomingRun IO h -> IO () #-}
922- -- | Supply the given amount of credits to a merging run. This /may/ cause an
923- -- ongoing merge to progress.
924- supplyMergeCredits ::
925- forall m h . (MonadSTM m , MonadST m , MonadMVar m , MonadMask m )
926- => ScaledCredits
927- -> CreditThreshold
928- -> IncomingRun m h
929- -> m ()
930- supplyMergeCredits _ _ Single {} = pure ()
931- supplyMergeCredits credits creditsThresh (Merging mr) =
932- supplyCreditsMergingRun credits creditsThresh mr
933-
934- {-# SPECIALISE expectCompletedMerge :: TempRegistry IO -> IncomingRun IO h -> IO (Ref (Run IO h)) #-}
935- expectCompletedMerge ::
936- (MonadMVar m , MonadSTM m , MonadST m , MonadMask m )
937- => TempRegistry m -> IncomingRun m h -> m (Ref (Run m h ))
938- expectCompletedMerge _ (Single r) = pure r
939- expectCompletedMerge reg (Merging mr) = expectCompletedMergingRun reg mr
916+ in MR. Credits ((c * n + 3 ) `div` 4 )
940917
941918-- TODO: the thresholds for doing merge work should be different for each level,
942919-- maybe co-prime?
943- creditThresholdForLevel :: TableConfig -> LevelNo -> CreditThreshold
920+ creditThresholdForLevel :: TableConfig -> LevelNo -> MR. CreditThreshold
944921creditThresholdForLevel conf (LevelNo _i) =
945922 let AllocNumEntries (NumEntries x) = confWriteBufferAlloc conf
946- in CreditThreshold x
923+ in MR. CreditThreshold x
0 commit comments