Skip to content

Commit e6dd4e6

Browse files
authored
Merge pull request #573 from IntersectMBO/mheinzel/index-merging-run
Add type parameter to MergingRun to limit possible merge types
2 parents 3553c33 + be14346 commit e6dd4e6

File tree

64 files changed

+196
-95
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+196
-95
lines changed

prototypes/ScheduledMerges.hs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,9 @@ invariant (LSMContent _ levels ul) = do
326326
Single r ->
327327
return (CompletedMerge r)
328328
Merging mp _ _ (MergingRun mt _ ref) -> do
329+
assertST $ ln > 1 -- no merges on level 1
329330
assertST $ mp == mergePolicyForLevel ln ls ul
330-
&& mt == mergeTypeForLevel ls ul
331+
assertST $ mt == mergeTypeForLevel ls ul
331332
readSTRef ref
332333

333334
assertST $ length rs <= 3
@@ -1274,17 +1275,19 @@ levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident >
12741275
-- first merge each input table into a single run, as there is no practical
12751276
-- distributive property between level and union merges.
12761277

1277-
-- | Ensures that the merge contains more than one input.
1278+
-- | Ensures that the merge contains more than one input, avoiding creating a
1279+
-- pending merge where possible.
12781280
newPendingLevelMerge :: [IncomingRun s]
12791281
-> Maybe (MergingTree s)
12801282
-> ST s (Maybe (MergingTree s))
12811283
newPendingLevelMerge [] t = return t
12821284
newPendingLevelMerge [Single r] Nothing =
1283-
-- If there is only a 'Merging' run, we could in principle also directly
1284-
-- turn that into 'OngoingTreeMerge`, but the type parameters don't match,
1285-
-- since it could be a midlevel merge. For simplicity, we don't handle that
1286-
-- case here, which means that there can be unary pending level merges.
12871285
Just . MergingTree <$> newSTRef (CompletedTreeMerge r)
1286+
newPendingLevelMerge [Merging{}] Nothing =
1287+
-- This case should never occur. If there is a single entry in the list,
1288+
-- there can only be one level in the input table. At level 1 there are no
1289+
-- merging runs, so it must be a PreExistingRun.
1290+
error "newPendingLevelMerge: singleton Merging run"
12881291
newPendingLevelMerge irs tree = do
12891292
let prs = map incomingToPreExistingRun irs
12901293
st = PendingTreeMerge (PendingLevelMerge prs tree)

src-extras/Database/LSMTree/Extras/Generators.hs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ import Database.LSMTree.Extras.Index (Append (..))
5454
import Database.LSMTree.Extras.Orphans ()
5555
import Database.LSMTree.Internal.BlobRef (BlobSpan (..))
5656
import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..))
57-
import Database.LSMTree.Internal.Merge (MergeType (..))
57+
import qualified Database.LSMTree.Internal.Merge as Merge
58+
import qualified Database.LSMTree.Internal.MergingRun as MR
5859
import Database.LSMTree.Internal.Page (PageNo (..))
5960
import Database.LSMTree.Internal.RawBytes as RB
6061
import Database.LSMTree.Internal.Serialise
@@ -561,8 +562,19 @@ instance Arbitrary BlobSpan where
561562
Merge
562563
-------------------------------------------------------------------------------}
563564

564-
instance Arbitrary MergeType where
565-
arbitrary = QC.elements [MergeMidLevel, MergeLastLevel, MergeUnion]
566-
shrink MergeMidLevel = []
567-
shrink MergeLastLevel = [MergeMidLevel]
568-
shrink MergeUnion = [MergeLastLevel]
565+
instance Arbitrary Merge.MergeType where
566+
arbitrary = QC.elements
567+
[Merge.MergeMidLevel, Merge.MergeLastLevel, Merge.MergeUnion]
568+
shrink Merge.MergeMidLevel = []
569+
shrink Merge.MergeLastLevel = [Merge.MergeMidLevel]
570+
shrink Merge.MergeUnion = [Merge.MergeLastLevel]
571+
572+
instance Arbitrary MR.LevelMergeType where
573+
arbitrary = QC.elements [MR.MergeMidLevel, MR.MergeLastLevel]
574+
shrink MR.MergeMidLevel = []
575+
shrink MR.MergeLastLevel = [MR.MergeMidLevel]
576+
577+
instance Arbitrary MR.TreeMergeType where
578+
arbitrary = QC.elements [MR.MergeLevel, MR.MergeUnion]
579+
shrink MR.MergeLevel = []
580+
shrink MR.MergeUnion = [MR.MergeLevel]

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
{-# LANGUAGE CPP #-}
21
{-# LANGUAGE DataKinds #-}
32
{-# LANGUAGE QuantifiedConstraints #-}
43
{-# LANGUAGE UndecidableInstances #-}
@@ -341,10 +340,10 @@ deriving anyclass instance NoThunks MergeKnownCompleted
341340
MergingRun
342341
-------------------------------------------------------------------------------}
343342

344-
deriving stock instance Generic (MergingRun m h)
343+
deriving stock instance Generic (MergingRun t m h)
345344
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
346345
, NoThunks (StrictMVar m (MergingRunState m h))
347-
) => NoThunks (MergingRun m h)
346+
) => NoThunks (MergingRun t m h)
348347

349348
deriving stock instance Generic (MergingRunState m h)
350349
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ import Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
4949
unNumEntries)
5050
import Database.LSMTree.Internal.Index (Index)
5151
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
52-
import Database.LSMTree.Internal.Merge (MergeType (..))
5352
import Database.LSMTree.Internal.MergingRun (MergingRun, NumRuns (..))
5453
import qualified Database.LSMTree.Internal.MergingRun as MR
5554
import Database.LSMTree.Internal.MergingTree (MergingTree)
@@ -95,7 +94,7 @@ data MergeTrace =
9594
RunDataCaching
9695
RunBloomFilterAlloc
9796
MergePolicyForLevel
98-
MergeType
97+
MR.LevelMergeType
9998
| TraceCompletedMerge -- TODO: currently not traced for Incremental merges
10099
NumEntries -- ^ Size of output run
101100
RunNumber
@@ -209,7 +208,7 @@ mkLevelsCache reg lvls = do
209208
foldRunAndMergeM ::
210209
Monoid a
211210
=> (Ref (Run m h) -> m a)
212-
-> (Ref (MergingRun m h) -> m a)
211+
-> (Ref (MergingRun MR.LevelMergeType m h) -> m a)
213212
-> Levels m h
214213
-> m a
215214
foldRunAndMergeM k1 k2 ls =
@@ -298,7 +297,7 @@ data Level m h = Level {
298297
-- | An incoming run is either a single run, or a merge.
299298
data IncomingRun m h =
300299
Single !(Ref (Run m h))
301-
| Merging !MergePolicyForLevel !(Ref (MergingRun m h))
300+
| Merging !MergePolicyForLevel !(Ref (MergingRun MR.LevelMergeType m h))
302301

303302
data MergePolicyForLevel = LevelTiering | LevelLevelling
304303
deriving stock (Show, Eq)
@@ -389,6 +388,7 @@ iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl
389388
-- * not stored in snapshots
390389
-- * not loaded from snapshots
391390
-- * ignored in lookups
391+
-- * never made merge progress on (by supplying credits to it)
392392
-- * never merged into the regular levels
393393
data UnionLevel m h =
394394
NoUnion
@@ -575,12 +575,13 @@ flushWriteBuffer tr conf@TableConfig{confFencePointerIndex, confDiskCachePolicy}
575575
| otherwise = do
576576
!n <- incrUniqCounter uc
577577
let !size = WB.numEntries (tableWriteBuffer tc)
578-
!l = LevelNo 1
579-
!cache = diskCachePolicyForLevel confDiskCachePolicy l
580-
!alloc = bloomFilterAllocForLevel conf l
578+
!ln = LevelNo 1
579+
!cache = diskCachePolicyForLevel confDiskCachePolicy ln
580+
!alloc = bloomFilterAllocForLevel conf ln
581581
!indexType = indexTypeForRun confFencePointerIndex
582582
!path = Paths.runPath root (uniqueToRunNumber n)
583-
traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc
583+
traceWith tr $ AtLevel ln $
584+
TraceFlushWriteBuffer size (runNumber path) cache alloc
584585
r <- withRollback reg
585586
(Run.fromWriteBuffer hfs hbio
586587
cache
@@ -654,30 +655,30 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
654655
traceWith tr $ AtLevel ln TraceAddLevel
655656
-- Make a new level
656657
let policyForLevel = mergePolicyForLevel confMergePolicy ln V.empty ul
657-
ir <- newMerge policyForLevel MergeLastLevel ln rs
658+
ir <- newMerge policyForLevel MR.MergeLastLevel ln rs
658659
return $! V.singleton $ Level ir V.empty
659660
go !ln rs' (V.uncons -> Just (Level ir rs, ls)) = do
660661
r <- expectCompletedMerge ln ir
661662
case mergePolicyForLevel confMergePolicy ln ls ul of
662663
-- If r is still too small for this level then keep it and merge again
663664
-- with the incoming runs.
664665
LevelTiering | Run.size r <= maxRunSize' conf LevelTiering (pred ln) -> do
665-
let mergelast = mergeLastForLevel ls ul
666-
ir' <- newMerge LevelTiering mergelast ln (rs' `V.snoc` r)
666+
let mergeType = mergeTypeForLevel ls ul
667+
ir' <- newMerge LevelTiering mergeType ln (rs' `V.snoc` r)
667668
pure $! Level ir' rs `V.cons` ls
668669
-- This tiering level is now full. We take the completed merged run
669670
-- (the previous incoming runs), plus all the other runs on this level
670671
-- as a bundle and move them down to the level below. We start a merge
671672
-- for the new incoming runs. This level is otherwise empty.
672673
LevelTiering | levelIsFull confSizeRatio rs -> do
673-
ir' <- newMerge LevelTiering MergeMidLevel ln rs'
674+
ir' <- newMerge LevelTiering MR.MergeMidLevel ln rs'
674675
ls' <- go (succ ln) (r `V.cons` rs) ls
675676
pure $! Level ir' V.empty `V.cons` ls'
676677
-- This tiering level is not yet full. We move the completed merged run
677678
-- into the level proper, and start the new merge for the incoming runs.
678679
LevelTiering -> do
679-
let mergelast = mergeLastForLevel ls ul
680-
ir' <- newMerge LevelTiering mergelast ln rs'
680+
let mergeType = mergeTypeForLevel ls ul
681+
ir' <- newMerge LevelTiering mergeType ln rs'
681682
traceWith tr $ AtLevel ln
682683
$ TraceAddRun
683684
(Run.runFsPathsNumber r)
@@ -689,13 +690,13 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
689690
-- empty) level .
690691
LevelLevelling | Run.size r > maxRunSize' conf LevelLevelling ln -> do
691692
assert (V.null rs && V.null ls) $ pure ()
692-
ir' <- newMerge LevelTiering MergeMidLevel ln rs'
693+
ir' <- newMerge LevelTiering MR.MergeMidLevel ln rs'
693694
ls' <- go (succ ln) (V.singleton r) V.empty
694695
pure $! Level ir' V.empty `V.cons` ls'
695696
-- Otherwise we start merging the incoming runs into the run.
696697
LevelLevelling -> do
697698
assert (V.null rs && V.null ls) $ pure ()
698-
ir' <- newMerge LevelLevelling MergeLastLevel ln (rs' `V.snoc` r)
699+
ir' <- newMerge LevelLevelling MR.MergeLastLevel ln (rs' `V.snoc` r)
699700
pure $! Level ir' V.empty `V.cons` V.empty
700701

701702
-- Releases the incoming run.
@@ -713,7 +714,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
713714

714715
-- Releases the runs.
715716
newMerge :: MergePolicyForLevel
716-
-> MergeType
717+
-> MR.LevelMergeType
717718
-> LevelNo
718719
-> V.Vector (Ref (Run m h))
719720
-> m (IncomingRun m h)
@@ -737,7 +738,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
737738
!indexType = indexTypeForRun confFencePointerIndex
738739
!runPaths = Paths.runPath root (uniqueToRunNumber n)
739740
traceWith tr $ AtLevel ln $
740-
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeType
741+
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc
742+
mergePolicy mergeType
741743
-- The runs will end up inside the merging run, with fresh references.
742744
-- The original references can be released (but only on the happy path).
743745
mr <- withRollback reg
@@ -801,10 +803,10 @@ maxRunSize' config policy ln =
801803

802804
-- | If there are no further levels provided, this level is the last one.
803805
-- However, if a 'Union' is present, it acts as another (last) level.
804-
mergeLastForLevel :: Levels m h -> UnionLevel m h -> MergeType
805-
mergeLastForLevel levels unionLevel
806-
| V.null levels, NoUnion <- unionLevel = MergeLastLevel
807-
| otherwise = MergeMidLevel
806+
mergeTypeForLevel :: Levels m h -> UnionLevel m h -> MR.LevelMergeType
807+
mergeTypeForLevel levels unionLevel
808+
| V.null levels, NoUnion <- unionLevel = MR.MergeLastLevel
809+
| otherwise = MR.MergeMidLevel
808810

809811
levelIsFull :: SizeRatio -> V.Vector run -> Bool
810812
levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr)
@@ -895,7 +897,7 @@ supplyCredits conf c levels =
895897
-- merging work than 1 merge step for each credit.
896898
scaleCreditsForMerge ::
897899
MergePolicyForLevel
898-
-> Ref (MergingRun m h)
900+
-> Ref (MergingRun t m h)
899901
-> Credits
900902
-> MR.Credits
901903
scaleCreditsForMerge LevelLevelling _ (Credits c) =

0 commit comments

Comments
 (0)