Skip to content

Commit 285fbaa

Browse files
authored
Merge pull request #563 from IntersectMBO/mheinzel/union-level
Add UnionLevel to TableContent
2 parents cb3d0ce + 1444848 commit 285fbaa

File tree

3 files changed

+144
-28
lines changed

3 files changed

+144
-28
lines changed

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import Database.LSMTree.Internal.Merge
4747
import qualified Database.LSMTree.Internal.Merge as Merge
4848
import Database.LSMTree.Internal.MergeSchedule
4949
import Database.LSMTree.Internal.MergingRun
50+
import Database.LSMTree.Internal.MergingTree
5051
import Database.LSMTree.Internal.Page
5152
import Database.LSMTree.Internal.PageAcc
5253
import Database.LSMTree.Internal.Paths
@@ -290,6 +291,7 @@ deriving anyclass instance NoThunks PageNo
290291
deriving stock instance Generic (TableContent m h)
291292
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
292293
, NoThunks (StrictMVar m (MergingRunState m h))
294+
, NoThunks (StrictMVar m (MergingTreeState m h))
293295
) => NoThunks (TableContent m h)
294296

295297
deriving stock instance Generic (LevelsCache m h)
@@ -306,14 +308,11 @@ deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
306308
, NoThunks (StrictMVar m (MergingRunState m h))
307309
) => NoThunks (IncomingRun m h)
308310

309-
deriving stock instance Generic (MergingRun m h)
311+
deriving stock instance Generic (UnionLevel m h)
310312
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
311313
, NoThunks (StrictMVar m (MergingRunState m h))
312-
) => NoThunks (MergingRun m h)
313-
314-
deriving stock instance Generic (MergingRunState m h)
315-
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
316-
=> NoThunks (MergingRunState m h)
314+
, NoThunks (StrictMVar m (MergingTreeState m h))
315+
) => NoThunks (UnionLevel m h)
317316

318317
deriving stock instance Generic MergePolicyForLevel
319318
deriving anyclass instance NoThunks MergePolicyForLevel
@@ -333,6 +332,46 @@ deriving anyclass instance Typeable s => NoThunks (SpentCreditsVar s)
333332
deriving stock instance Generic MergeKnownCompleted
334333
deriving anyclass instance NoThunks MergeKnownCompleted
335334

335+
{-------------------------------------------------------------------------------
336+
MergingRun
337+
-------------------------------------------------------------------------------}
338+
339+
deriving stock instance Generic (MergingRun m h)
340+
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
341+
, NoThunks (StrictMVar m (MergingRunState m h))
342+
) => NoThunks (MergingRun m h)
343+
344+
deriving stock instance Generic (MergingRunState m h)
345+
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
346+
=> NoThunks (MergingRunState m h)
347+
348+
{-------------------------------------------------------------------------------
349+
MergingTree
350+
-------------------------------------------------------------------------------}
351+
352+
deriving stock instance Generic (MergingTree m h)
353+
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
354+
, NoThunks (StrictMVar m (MergingRunState m h))
355+
, NoThunks (StrictMVar m (MergingTreeState m h))
356+
) => NoThunks (MergingTree m h)
357+
358+
deriving stock instance Generic (MergingTreeState m h)
359+
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
360+
, NoThunks (StrictMVar m (MergingRunState m h))
361+
, NoThunks (StrictMVar m (MergingTreeState m h))
362+
) => NoThunks (MergingTreeState m h)
363+
364+
deriving stock instance Generic (PendingMerge m h)
365+
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
366+
, NoThunks (StrictMVar m (MergingRunState m h))
367+
, NoThunks (StrictMVar m (MergingTreeState m h))
368+
) => NoThunks (PendingMerge m h)
369+
370+
deriving stock instance Generic (PreExistingRun m h)
371+
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
372+
, NoThunks (StrictMVar m (MergingRunState m h))
373+
) => NoThunks (PreExistingRun m h)
374+
336375
{-------------------------------------------------------------------------------
337376
Entry
338377
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,7 @@ new sesh conf = do
711711
, tableWriteBufferBlobs
712712
, tableLevels
713713
, tableCache
714+
, tableUnionLevel = NoUnion
714715
}
715716
newWith reg sesh seshEnv conf am tc
716717

@@ -1250,6 +1251,7 @@ openSnapshot sesh label tableType override snap resolve = do
12501251
, tableWriteBufferBlobs
12511252
, tableLevels
12521253
, tableCache
1254+
, tableUnionLevel = NoUnion -- TODO: at some point also load union level from snapshot
12531255
}
12541256

12551257
{-# SPECIALISE deleteSnapshot ::

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 97 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ module Database.LSMTree.Internal.MergeSchedule (
1616
, Level (..)
1717
, IncomingRun (..)
1818
, MergePolicyForLevel (..)
19+
-- * Union level
20+
, UnionLevel (..)
1921
-- * Flushes and scheduled merges
2022
, updatesWithInterleavedFlushes
2123
, flushWriteBuffer
@@ -50,6 +52,7 @@ import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
5052
import Database.LSMTree.Internal.Merge (MergeType (..))
5153
import Database.LSMTree.Internal.MergingRun (MergingRun, NumRuns (..))
5254
import qualified Database.LSMTree.Internal.MergingRun as MR
55+
import Database.LSMTree.Internal.MergingTree (MergingTree)
5356
import Database.LSMTree.Internal.Paths (RunFsPaths (..),
5457
SessionRoot (..))
5558
import qualified Database.LSMTree.Internal.Paths as Paths
@@ -108,15 +111,21 @@ data MergeTrace =
108111
Table content
109112
-------------------------------------------------------------------------------}
110113

114+
-- | The levels of the table, from most to least recently inserted.
111115
data TableContent m h = TableContent {
112-
--TODO: probably less allocation to make this a MutVar
116+
-- | The in-memory level 0 of the table
117+
--
118+
-- TODO: probably less allocation to make this a MutVar
113119
tableWriteBuffer :: !WriteBuffer
114120
-- | The blob storage for entries in the write buffer
115121
, tableWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h))
116-
-- | A hierarchy of levels. The vector indexes double as level numbers.
122+
-- | A hierarchy of \"regular\" on-disk levels numbered 1 and up. Note that
123+
-- vector index @n@ refers to level @n+1@.
117124
, tableLevels :: !(Levels m h)
118-
-- | Cache of flattened 'levels'.
125+
-- | Cache of flattened regular 'levels'.
119126
, tableCache :: !(LevelsCache m h)
127+
-- | An optional final union level, not included in the table cache.
128+
, tableUnionLevel :: !(UnionLevel m h)
120129
}
121130

122131
{-# SPECIALISE duplicateTableContent :: ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-}
@@ -125,22 +134,24 @@ duplicateTableContent ::
125134
=> ActionRegistry m
126135
-> TableContent m h
127136
-> m (TableContent m h)
128-
duplicateTableContent reg (TableContent wb wbb levels cache) = do
137+
duplicateTableContent reg (TableContent wb wbb levels cache ul) = do
129138
wbb' <- withRollback reg (dupRef wbb) releaseRef
130139
levels' <- duplicateLevels reg levels
131140
cache' <- duplicateLevelsCache reg cache
132-
return $! TableContent wb wbb' levels' cache'
141+
ul' <- duplicateUnionLevel reg ul
142+
return $! TableContent wb wbb' levels' cache' ul'
133143

134144
{-# SPECIALISE releaseTableContent :: ActionRegistry IO -> TableContent IO h -> IO () #-}
135145
releaseTableContent ::
136146
(PrimMonad m, MonadMask m)
137147
=> ActionRegistry m
138148
-> TableContent m h
139149
-> m ()
140-
releaseTableContent reg (TableContent _wb wbb levels cache) = do
150+
releaseTableContent reg (TableContent _wb wbb levels cache ul) = do
141151
delayedCommit reg (releaseRef wbb)
142152
releaseLevels reg levels
143153
releaseLevelsCache reg cache
154+
releaseUnionLevel reg ul
144155

145156
{-------------------------------------------------------------------------------
146157
Levels cache
@@ -276,7 +287,9 @@ releaseLevelsCache reg cache =
276287

277288
type Levels m h = V.Vector (Level m h)
278289

279-
-- | Runs in order from newer to older
290+
-- | A level is a sequence of resident runs at this level, prefixed by an
291+
-- incoming run, which is usually multiple runs that are being merged. Once
292+
-- completed, the resulting run will become a resident run at this level.
280293
data Level m h = Level {
281294
incomingRun :: !(IncomingRun m h)
282295
, residentRuns :: !(V.Vector (Ref (Run m h)))
@@ -294,13 +307,23 @@ instance NFData MergePolicyForLevel where
294307
rnf LevelTiering = ()
295308
rnf LevelLevelling = ()
296309

297-
mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h -> MergePolicyForLevel
298-
mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels
310+
-- | We use levelling on the last level, unless that is also the first level.
311+
mergePolicyForLevel ::
312+
MergePolicy
313+
-> LevelNo
314+
-> Levels m h
315+
-> UnionLevel m h
316+
-> MergePolicyForLevel
317+
mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels unionLevel
299318
| n == 1
300-
, V.null nextLevels
301319
= LevelTiering -- always use tiering on first level
302-
| V.null nextLevels = LevelLevelling -- levelling on last level
303-
| otherwise = LevelTiering
320+
321+
| V.null nextLevels
322+
, NoUnion <- unionLevel
323+
= LevelLevelling -- levelling on last level
324+
325+
| otherwise
326+
= LevelTiering
304327

305328
{-# SPECIALISE duplicateLevels :: ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-}
306329
duplicateLevels ::
@@ -353,6 +376,50 @@ releaseIncomingRun reg (Merging _ mr) = delayedCommit reg (releaseRef mr)
353376
iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
354377
iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl
355378

379+
{-------------------------------------------------------------------------------
380+
Union level
381+
-------------------------------------------------------------------------------}
382+
383+
-- | An additional optional last level, created as a result of
384+
-- 'Database.LSMTree.Monoidal.union'. It can not only contain an ongoing merge
385+
-- of multiple runs, but a nested tree of merges.
386+
--
387+
-- TODO: So far, this is
388+
-- * never created
389+
-- * not stored in snapshots
390+
-- * not loaded from snapshots
391+
-- * ignored in lookups
392+
-- * never merged into the regular levels
393+
data UnionLevel m h =
394+
NoUnion
395+
| Union !(Ref (MergingTree m h))
396+
397+
{-# SPECIALISE duplicateUnionLevel ::
398+
ActionRegistry IO
399+
-> UnionLevel IO h
400+
-> IO (UnionLevel IO h) #-}
401+
duplicateUnionLevel ::
402+
(PrimMonad m, MonadMask m)
403+
=> ActionRegistry m
404+
-> UnionLevel m h
405+
-> m (UnionLevel m h)
406+
duplicateUnionLevel reg ul =
407+
case ul of
408+
NoUnion -> return ul
409+
Union tree -> Union <$> withRollback reg (dupRef tree) releaseRef
410+
411+
{-# SPECIALISE releaseUnionLevel ::
412+
ActionRegistry IO
413+
-> UnionLevel IO h
414+
-> IO () #-}
415+
releaseUnionLevel ::
416+
(PrimMonad m, MonadMask m)
417+
=> ActionRegistry m
418+
-> UnionLevel m h
419+
-> m ()
420+
releaseUnionLevel _ NoUnion = return ()
421+
releaseUnionLevel reg (Union tree) = delayedCommit reg (releaseRef tree)
422+
356423
{-------------------------------------------------------------------------------
357424
Flushes and scheduled merges
358425
-------------------------------------------------------------------------------}
@@ -524,13 +591,17 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy}
524591
delayedCommit reg (releaseRef (tableWriteBufferBlobs tc))
525592
wbblobs' <- withRollback reg (WBB.new hfs (Paths.tableBlobPath root n))
526593
releaseRef
527-
levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc)
594+
levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg
595+
(tableLevels tc)
596+
(tableUnionLevel tc)
528597
tableCache' <- rebuildCache reg (tableCache tc) levels'
529598
pure $! TableContent {
530599
tableWriteBuffer = WB.empty
531600
, tableWriteBufferBlobs = wbblobs'
532601
, tableLevels = levels'
533602
, tableCache = tableCache'
603+
-- TODO: move into regular levels if merge completed and size fits
604+
, tableUnionLevel = tableUnionLevel tc
534605
}
535606

536607
{-# SPECIALISE addRunToLevels ::
@@ -544,6 +615,7 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy}
544615
-> Ref (Run IO h)
545616
-> ActionRegistry IO
546617
-> Levels IO h
618+
-> UnionLevel IO h
547619
-> IO (Levels IO h) #-}
548620
-- | Add a run to the levels, and propagate merges.
549621
--
@@ -562,8 +634,9 @@ addRunToLevels ::
562634
-> Ref (Run m h)
563635
-> ActionRegistry m
564636
-> Levels m h
637+
-> UnionLevel m h
565638
-> m (Levels m h)
566-
addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = do
639+
addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul = do
567640
go (LevelNo 1) (V.singleton r0) levels
568641
where
569642
-- NOTE: @go@ is based on the @increment@ function from the
@@ -578,16 +651,16 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
578651
go !ln rs (V.uncons -> Nothing) = do
579652
traceWith tr $ AtLevel ln TraceAddLevel
580653
-- Make a new level
581-
let policyForLevel = mergePolicyForLevel confMergePolicy ln V.empty
654+
let policyForLevel = mergePolicyForLevel confMergePolicy ln V.empty ul
582655
ir <- newMerge policyForLevel MergeLastLevel ln rs
583656
return $! V.singleton $ Level ir V.empty
584657
go !ln rs' (V.uncons -> Just (Level ir rs, ls)) = do
585658
r <- expectCompletedMerge ln ir
586-
case mergePolicyForLevel confMergePolicy ln ls of
659+
case mergePolicyForLevel confMergePolicy ln ls ul of
587660
-- If r is still too small for this level then keep it and merge again
588661
-- with the incoming runs.
589662
LevelTiering | Run.size r <= maxRunSize' conf LevelTiering (pred ln) -> do
590-
let mergelast = mergeLastForLevel ls
663+
let mergelast = mergeLastForLevel ls ul
591664
ir' <- newMerge LevelTiering mergelast ln (rs' `V.snoc` r)
592665
pure $! Level ir' rs `V.cons` ls
593666
-- This tiering level is now full. We take the completed merged run
@@ -601,7 +674,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
601674
-- This tiering level is not yet full. We move the completed merged run
602675
-- into the level proper, and start the new merge for the incoming runs.
603676
LevelTiering -> do
604-
let mergelast = mergeLastForLevel ls
677+
let mergelast = mergeLastForLevel ls ul
605678
ir' <- newMerge LevelTiering mergelast ln rs'
606679
traceWith tr $ AtLevel ln
607680
$ TraceAddRun
@@ -722,10 +795,12 @@ maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
722795
maxRunSize' config policy ln =
723796
maxRunSize (confSizeRatio config) (confWriteBufferAlloc config) policy ln
724797

725-
mergeLastForLevel :: Levels m h -> MergeType
726-
mergeLastForLevel levels
727-
| V.null levels = MergeLastLevel
728-
| otherwise = MergeMidLevel
798+
-- | If there are no further levels provided, this level is the last one.
799+
-- However, if a 'Union' is present, it acts as another (last) level.
800+
mergeLastForLevel :: Levels m h -> UnionLevel m h -> MergeType
801+
mergeLastForLevel levels unionLevel
802+
| V.null levels, NoUnion <- unionLevel = MergeLastLevel
803+
| otherwise = MergeMidLevel
729804

730805
levelIsFull :: SizeRatio -> V.Vector run -> Bool
731806
levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr)

0 commit comments

Comments
 (0)