@@ -50,7 +50,28 @@ module ScheduledMerges (
5050 representationShape ,
5151 Event ,
5252 EventAt (.. ),
53- EventDetail (.. )
53+ EventDetail (.. ),
54+ MergingTree (.. ),
55+ MergingTreeState (.. ),
56+ PendingMerge (.. ),
57+ IncomingRun (.. ),
58+ MergingRun (.. ),
59+ MergingRunState (.. ),
60+ MergePolicy (.. ),
61+ IsMergeType (.. ),
62+ TreeMergeType (.. ),
63+ LevelMergeType (.. ),
64+ MergeDebt (.. ),
65+ Run ,
66+ supplyCreditsMergingTree ,
67+ remainingDebtMergingTree ,
68+ mergek ,
69+ mergeBatchSize ,
70+
71+ -- * Invariants
72+ Invariant ,
73+ evalInvariant ,
74+ treeInvariant ,
5475 ) where
5576
5677import Prelude hiding (lookup )
@@ -65,6 +86,7 @@ import Data.STRef
6586import qualified Control.Exception as Exc (assert )
6687import Control.Monad (foldM , forM , when )
6788import Control.Monad.ST
89+ import qualified Control.Monad.Trans.Except as E
6890import Control.Tracer (Tracer , contramap , traceWith )
6991import GHC.Stack (HasCallStack , callStack )
7092
@@ -104,7 +126,7 @@ data IncomingRun s = Merging !MergePolicy !(MergingRun LevelMergeType s)
104126-- | The merge policy for a LSM level can be either tiering or levelling.
105127-- In this design we use levelling for the last level, and tiering for
106128-- all other levels. The first level always uses tiering however, even if
107- -- it's also the last level. So 'MergePolicy' and 'IncomingMergeType ' are
129+ -- it's also the last level. So 'MergePolicy' and 'LevelMergeType ' are
108130-- orthogonal, all combinations are possible.
109131--
110132data MergePolicy = MergePolicyTiering | MergePolicyLevelling
@@ -272,9 +294,9 @@ mergePolicyForLevel _ _ _ = MergePolicyTiering
272294
273295-- | If there are no further levels provided, this level is the last one.
274296-- However, if a 'Union' is present, it acts as another (last) level.
275- levelMergeTypeForLevel :: [Level s ] -> UnionLevel s -> LevelMergeType
276- levelMergeTypeForLevel [] NoUnion = MergeLastLevel
277- levelMergeTypeForLevel _ _ = MergeMidLevel
297+ mergeTypeForLevel :: [Level s ] -> UnionLevel s -> LevelMergeType
298+ mergeTypeForLevel [] NoUnion = MergeLastLevel
299+ mergeTypeForLevel _ _ = MergeMidLevel
278300
279301-- | Note that the invariants rely on the fact that levelling is only used on
280302-- the last level.
@@ -284,7 +306,7 @@ invariant (LSMContent _ levels ul) = do
284306 levelsInvariant 1 levels
285307 case ul of
286308 NoUnion -> return ()
287- Union tree _ -> treeInvariant tree
309+ Union tree _ -> expectInvariant ( treeInvariant tree)
288310 where
289311 levelsInvariant :: Int -> Levels s -> ST s ()
290312 levelsInvariant ! _ [] = return ()
@@ -295,7 +317,7 @@ invariant (LSMContent _ levels ul) = do
295317 return (CompletedMerge r)
296318 Merging mp (MergingRun mt ref) -> do
297319 assertST $ mp == mergePolicyForLevel ln ls ul
298- && mt == levelMergeTypeForLevel ls ul
320+ && mt == mergeTypeForLevel ls ul
299321 readSTRef ref
300322
301323 assertST $ length rs <= 3
@@ -359,7 +381,7 @@ invariant (LSMContent _ levels ul) = do
359381 assertST $ all (\ r -> levellingRunSizeToLevel r <= ln+ 1 ) resident
360382
361383 MergePolicyTiering ->
362- case (ir, mrs, levelMergeTypeForLevel ls ul) of
384+ case (ir, mrs, mergeTypeForLevel ls ul) of
363385 -- A single incoming run (which thus didn't need merging) must be
364386 -- of the expected size already
365387 (Single r, m, _) -> do
@@ -390,33 +412,91 @@ invariant (LSMContent _ levels ul) = do
390412 assertST $ length rs == 4 || length rs == 5
391413 assertST $ all (\ r -> tieringRunSizeToLevel r == ln- 1 ) rs
392414
393- -- We don't make many assumptions apart from what the types already enforce.
394- -- In particular, there are no invariants on the progress of the merges,
395- -- since union merge credits are independent from the tables' regular level
396- -- merges.
397- treeInvariant :: MergingTree s -> ST s ()
398- treeInvariant (MergingTree treeState) = readSTRef treeState >>= \ case
399- CompletedTreeMerge _ ->
400- return ()
415+ -- We don't make many assumptions apart from what the types already enforce.
416+ -- In particular, there are no invariants on the progress of the merges,
417+ -- since union merge credits are independent from the tables' regular level
418+ -- merges.
419+ treeInvariant :: MergingTree s -> Invariant s ()
420+ treeInvariant tree@ (MergingTree treeState) = do
421+ liftI (readSTRef treeState) >>= \ case
422+ CompletedTreeMerge _ ->
423+ -- We don't require the completed merges to be non-empty, since even
424+ -- a (last-level) merge of non-empty runs can end up being empty.
425+ -- In the prototype it would be possible to ensure that empty runs are
426+ -- immediately trimmed from the tree, but this kind of normalisation
427+ -- is complicated with sharing. For example, merging runs and
428+ -- trees are shared, so if one of them completes as an empty run,
429+ -- all tables referencing it suddenly contain an empty run and would
430+ -- need to be updated immediately.
431+ return ()
432+
433+ OngoingTreeMerge mr ->
434+ mergeInvariant mr
435+
436+ PendingTreeMerge (PendingLevelMerge irs t) -> do
437+ -- Non-empty, but can be just one input (see 'newPendingLevelMerge').
438+ -- Note that children of a pending merge can be empty runs, as noted
439+ -- above for 'CompletedTreeMerge'.
440+ assertI " pending level merges have at least one input" $
441+ length irs + length t > 0
442+ for_ irs $ \ case
443+ Single _ -> return ()
444+ Merging _ mr -> mergeInvariant mr
445+ for_ t treeInvariant
446+
447+ PendingTreeMerge (PendingUnionMerge ts) -> do
448+ assertI " pending union merges are non-trivial (at least two inputs)" $
449+ length ts > 1
450+ for_ ts treeInvariant
451+
452+ (debt, _) <- liftI $ remainingDebtMergingTree tree
453+ when (debt <= 0 ) $ do
454+ _ <- isCompletedMergingTree tree
455+ return ()
456+
457+ mergeInvariant :: MergingRun t s -> Invariant s ()
458+ mergeInvariant (MergingRun _ ref) =
459+ liftI (readSTRef ref) >>= \ case
460+ CompletedMerge _ -> return ()
461+ OngoingMerge _ rs _ -> do
462+ assertI " inputs to ongoing merges aren't empty" $
463+ all (\ r -> runSize r > 0 ) rs
464+ assertI " ongoing merges are non-trivial (at least two inputs)" $
465+ length rs > 1
466+
467+ isCompletedMergingRun :: MergingRun t s -> Invariant s Run
468+ isCompletedMergingRun (MergingRun _ ref) = do
469+ mrs <- liftI $ readSTRef ref
470+ case mrs of
471+ CompletedMerge r -> return r
472+ OngoingMerge d _ _ -> failI $ " not completed: OngoingMerge with"
473+ ++ " remaining debt " ++ show d
401474
402- OngoingTreeMerge (MergingRun _ mergeState) -> do
403- readSTRef mergeState >>= \ case
404- CompletedMerge _ -> return ()
405- OngoingMerge _ rs _ -> do
406- -- Inputs to ongoing merges aren't empty (but can while pending!).
407- assertST $ all (\ r -> runSize r > 0 ) rs
408- -- Merges are non-trivial (at least two inputs).
409- assertST $ length rs > 1
475+ isCompletedMergingTree :: MergingTree s -> Invariant s Run
476+ isCompletedMergingTree (MergingTree ref) = do
477+ mts <- liftI $ readSTRef ref
478+ case mts of
479+ CompletedTreeMerge r -> return r
480+ OngoingTreeMerge mr -> isCompletedMergingRun mr
481+ PendingTreeMerge _ -> failI $ " not completed: PendingTreeMerge"
410482
411- PendingTreeMerge (PendingLevelMerge irs tree) -> do
412- -- No empty merge, but could be just one input.
413- assertST $ length irs + length tree > 0
414- for_ tree treeInvariant
483+ type Invariant s = E. ExceptT String (ST s )
415484
416- PendingTreeMerge (PendingUnionMerge trees) -> do
417- -- Merges are non-trivial (at least two inputs).
418- assertST $ length trees > 1
419- for_ trees treeInvariant
485+ assertI :: String -> Bool -> Invariant s ()
486+ assertI _ True = return ()
487+ assertI e False = failI e
488+
489+ failI :: String -> Invariant s a
490+ failI = E. throwE
491+
492+ liftI :: ST s a -> Invariant s a
493+ liftI = E. ExceptT . fmap Right
494+
495+ expectInvariant :: HasCallStack => Invariant s a -> ST s a
496+ expectInvariant act = E. runExceptT act >>= either error return
497+
498+ evalInvariant :: Invariant s a -> ST s (Either String a )
499+ evalInvariant = E. runExceptT
420500
421501-- 'callStack' just ensures that the 'HasCallStack' constraint is not redundant
422502-- when compiling with debug assertions disabled.
@@ -435,7 +515,7 @@ newMergingRun mdebt mergeType runs = do
435515 assertST $ length runs > 1
436516 -- in some cases, no merging is required at all
437517 state <- case filter (\ r -> runSize r > 0 ) runs of
438- [] -> return $ CompletedMerge Map. empty
518+ [] -> return $ CompletedMerge ( head runs) -- just re-use the empty input
439519 [r] -> return $ CompletedMerge r
440520 rs -> do
441521 let ! cost = sum (map runSize rs)
@@ -480,12 +560,7 @@ combineUnion (Insert v' b') (Mupsert v) = Insert (resolveValue v' v) b'
480560combineUnion (Insert v' b') (Insert v _) = Insert (resolveValue v' v) b'
481561
482562expectCompletedMergingRun :: HasCallStack => MergingRun t s -> ST s Run
483- expectCompletedMergingRun (MergingRun _ ref) = do
484- mrs <- readSTRef ref
485- case mrs of
486- CompletedMerge r -> return r
487- OngoingMerge d _ _ -> error $ " expectCompletedMergingRun:"
488- ++ " remaining debt of " ++ show d
563+ expectCompletedMergingRun = expectInvariant . isCompletedMergingRun
489564
490565supplyCreditsMergingRun :: Credit -> MergingRun t s -> ST s Credit
491566supplyCreditsMergingRun = checked remainingDebtMergingRun $ \ credits (MergingRun _ ref) -> do
@@ -685,14 +760,8 @@ supplyUnionCredits (LSMHandle scr lsmr) credits
685760 _debt <- checkedUnionDebt tree debtRef -- just to make sure it's checked
686761 c' <- supplyCreditsMergingTree credits tree
687762 debt' <- checkedUnionDebt tree debtRef
688- if (debt' > 0 )
689- then
690- -- should have spent these credits
691- assertST $ c' == 0
692- else do
693- -- check it really is done
694- _ <- expectCompletedMergingTree tree
695- return ()
763+ when (debt' > 0 ) $
764+ assertST $ c' == 0 -- should have spent these credits
696765 invariant content
697766 return c'
698767
@@ -863,7 +932,7 @@ increment tr sc run0 ls0 ul = do
863932 go 1 [run0] ls0
864933 where
865934 mergeTypeFor :: Levels s -> LevelMergeType
866- mergeTypeFor ls = levelMergeTypeForLevel ls ul
935+ mergeTypeFor ls = mergeTypeForLevel ls ul
867936
868937 go :: Int -> [Run ] -> Levels s -> ST s (Levels s )
869938 go ! ln incoming [] = do
@@ -1010,10 +1079,13 @@ newPendingLevelMerge :: [IncomingRun s]
10101079 -> Maybe (MergingTree s )
10111080 -> ST s (Maybe (MergingTree s ))
10121081newPendingLevelMerge [] t = return t
1082+ newPendingLevelMerge [Single r] Nothing =
1083+ -- If there is only a 'Merging' run, we could in principle also directly
1084+ -- turn that into 'OngoingTreeMerge`, but the type parameters don't match,
1085+ -- since it could be a midlevel merge. For simplicity, we don't handle that
1086+ -- case here, which means that there can be unary pending level merges.
1087+ Just . MergingTree <$> newSTRef (CompletedTreeMerge r)
10131088newPendingLevelMerge irs tree = do
1014- -- If there is just a single IncomingRun, we could directly turn that into
1015- -- a MergingTree, but it's not necessary and a little complicated because
1016- -- of the LevelMergeType/TreeMergeType mismatch.
10171089 let st = PendingTreeMerge (PendingLevelMerge irs tree)
10181090 Just . MergingTree <$> newSTRef st
10191091
@@ -1059,7 +1131,11 @@ remainingDebtPendingMerge (PendingMerge _ irs trees) = do
10591131 , traverse remainingDebtMergingTree trees
10601132 ]
10611133 let totalSize = sum sizes
1062- let totalDebt = sum debts + totalSize
1134+ -- A pending merge should never have 0 remaining debt. It needs some work to
1135+ -- complete it, even if all its inputs are empty. It's not enought to use
1136+ -- @max 1@, as this would violate the property that supplying N credits
1137+ -- reduces the remaining debt by at least N.
1138+ let totalDebt = sum debts + totalSize + 1
10631139 return (totalDebt, totalSize)
10641140 where
10651141 remainingDebtIncomingRun = \ case
@@ -1179,11 +1255,7 @@ expectCompletedChildren (PendingMerge mt irs trees) = do
11791255 Merging _ mr -> expectCompletedMergingRun mr
11801256
11811257expectCompletedMergingTree :: HasCallStack => MergingTree s -> ST s Run
1182- expectCompletedMergingTree (MergingTree ref) = do
1183- readSTRef ref >>= \ case
1184- CompletedTreeMerge r -> return r
1185- OngoingTreeMerge mr -> expectCompletedMergingRun mr
1186- PendingTreeMerge _ -> error $ " expectCompletedMergingTree: PendingTreeMerge"
1258+ expectCompletedMergingTree = expectInvariant . isCompletedMergingTree
11871259
11881260-------------------------------------------------------------------------------
11891261-- Measurements
@@ -1345,3 +1417,9 @@ instance (QC.Arbitrary v, QC.Arbitrary b) => QC.Arbitrary (Update v b) where
13451417 , (1 , Mupsert <$> QC. arbitrary)
13461418 , (1 , pure Delete )
13471419 ]
1420+
1421+ instance QC. Arbitrary LevelMergeType where
1422+ arbitrary = QC. elements [MergeMidLevel , MergeLastLevel ]
1423+
1424+ instance QC. Arbitrary TreeMergeType where
1425+ arbitrary = QC. elements [MergeLevel , MergeUnion ]
0 commit comments