Skip to content

Commit 2b12878

Browse files
committed
Refactor IncomingRun constructor functions
We want to share merging run types and code better once we flesh out the merging tree functionality -- which uses merging runs. In particulur snapshot restore will need to construct merging runs in three places, and we want to reuse the same code in each place. Currently, the higher level constructors for merging runs are coupled with incoming runs, because that's the only place they are used now. But once we have merging runs in merging trees, then they're no longer in an incoming run. So we want to be able to reuse that without needing to rely on an incoming run. So we have to make incoming runs "thinner", and break up the policy functions for making merging runs at a given level.
1 parent 585b257 commit 2b12878

File tree

2 files changed

+128
-121
lines changed

2 files changed

+128
-121
lines changed

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 94 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ module Database.LSMTree.Internal.MergeSchedule (
2626
, IncomingRun (..)
2727
, MergePolicyForLevel (..)
2828
, newIncomingSingleRun
29-
, newIncomingCompletedMergingRun
3029
, newIncomingMergingRun
3130
, releaseIncomingRun
3231
, supplyCreditsIncomingRun
3332
, snapshotIncomingRun
33+
, mergingRunParamsForLevel
3434
-- * Union level
3535
, UnionLevel (..)
3636
-- * Flushes and scheduled merges
@@ -45,6 +45,8 @@ module Database.LSMTree.Internal.MergeSchedule (
4545
, creditThresholdForLevel
4646
, NominalDebt (..)
4747
, NominalCredits (..)
48+
, nominalDebtAsCredits
49+
, nominalDebtForLevel
4850
-- * Exported for testing
4951
, addWriteBufferEntries
5052
) where
@@ -67,14 +69,14 @@ import Database.LSMTree.Internal.Assertions (assert)
6769
import Database.LSMTree.Internal.Config
6870
import Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
6971
unNumEntries)
70-
import Database.LSMTree.Internal.Index (Index)
72+
import Database.LSMTree.Internal.Index (Index, IndexType)
7173
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
7274
import Database.LSMTree.Internal.MergingRun (MergeCredits (..),
7375
MergeDebt (..), MergingRun, NumRuns (..))
7476
import qualified Database.LSMTree.Internal.MergingRun as MR
7577
import Database.LSMTree.Internal.MergingTree (MergingTree)
7678
import Database.LSMTree.Internal.Paths (ActiveDir, RunFsPaths (..),
77-
SessionRoot (..))
79+
SessionRoot)
7880
import qualified Database.LSMTree.Internal.Paths as Paths
7981
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..))
8082
import qualified Database.LSMTree.Internal.Run as Run
@@ -121,9 +123,6 @@ data MergeTrace =
121123
| TraceNewMergeSingleRun
122124
NumEntries -- ^ Size of run
123125
RunNumber
124-
| TraceNewMergeCompletedRun
125-
NumEntries -- ^ Size of run
126-
RunNumber
127126
| TraceCompletedMerge -- TODO: currently not traced for Incremental merges
128127
NumEntries -- ^ Size of output run
129128
RunNumber
@@ -414,100 +413,23 @@ releaseIncomingRun ::
414413
releaseIncomingRun (Single r) = releaseRef r
415414
releaseIncomingRun (Merging _ _ _ mr) = releaseRef mr
416415

417-
{-# SPECIALISE newIncomingSingleRun ::
418-
Tracer IO (AtLevel MergeTrace)
419-
-> LevelNo
420-
-> Ref (Run IO h)
421-
-> IO (IncomingRun IO h) #-}
416+
{-# INLINE newIncomingSingleRun #-}
422417
newIncomingSingleRun ::
423418
(PrimMonad m, MonadThrow m)
424-
=> Tracer m (AtLevel MergeTrace)
425-
-> LevelNo
426-
-> Ref (Run m h)
427-
-> m (IncomingRun m h)
428-
newIncomingSingleRun tr ln r = do
429-
r' <- dupRef r
430-
traceWith tr $ AtLevel ln $
431-
TraceNewMergeSingleRun (Run.size r') (Run.runFsPathsNumber r')
432-
return (Single r')
433-
434-
{-# SPECIALISE newIncomingCompletedMergingRun ::
435-
Tracer IO (AtLevel MergeTrace)
436-
-> TableConfig
437-
-> LevelNo
438-
-> MergePolicyForLevel
439-
-> NumRuns
440-
-> MergeDebt
441-
-> Ref (Run IO h)
442-
-> IO (IncomingRun IO h) #-}
443-
newIncomingCompletedMergingRun ::
444-
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
445-
=> Tracer m (AtLevel MergeTrace)
446-
-> TableConfig
447-
-> LevelNo
448-
-> MergePolicyForLevel
449-
-> NumRuns
450-
-> MergeDebt
451-
-> Ref (Run m h)
419+
=> Ref (Run m h)
452420
-> m (IncomingRun m h)
453-
newIncomingCompletedMergingRun tr conf ln mergePolicy nr mergeDebt r = do
454-
traceWith tr $ AtLevel ln $
455-
TraceNewMergeCompletedRun (Run.size r) (Run.runFsPathsNumber r)
456-
mr <- MR.newCompleted nr mergeDebt r
457-
let nominalDebt = nominalDebtForLevel conf ln
458-
nominalCredits = nominalDebtAsCredits nominalDebt
459-
nominalCreditsVar <- newPrimVar nominalCredits
460-
return (Merging mergePolicy nominalDebt nominalCreditsVar mr)
421+
newIncomingSingleRun r = Single <$> dupRef r
461422

462-
{-# SPECIALISE newIncomingMergingRun ::
463-
Tracer IO (AtLevel MergeTrace)
464-
-> HasFS IO h
465-
-> HasBlockIO IO h
466-
-> ActiveDir
467-
-> UniqCounter IO
468-
-> TableConfig
469-
-> ResolveSerialisedValue
470-
-> MergePolicyForLevel
471-
-> MR.LevelMergeType
472-
-> LevelNo
473-
-> V.Vector (Ref (Run IO h))
474-
-> IO (IncomingRun IO h) #-}
423+
{-# INLINE newIncomingMergingRun #-}
475424
newIncomingMergingRun ::
476-
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
477-
=> Tracer m (AtLevel MergeTrace)
478-
-> HasFS m h
479-
-> HasBlockIO m h
480-
-> ActiveDir
481-
-> UniqCounter m
482-
-> TableConfig
483-
-> ResolveSerialisedValue
484-
-> MergePolicyForLevel
485-
-> MR.LevelMergeType
486-
-> LevelNo
487-
-> V.Vector (Ref (Run m h))
425+
PrimMonad m
426+
=> MergePolicyForLevel
427+
-> NominalDebt
428+
-> Ref (MergingRun MR.LevelMergeType m h)
488429
-> m (IncomingRun m h)
489-
newIncomingMergingRun tr hfs hbio activeDir uc
490-
conf@TableConfig {
491-
confDiskCachePolicy,
492-
confFencePointerIndex
493-
}
494-
resolve mergePolicy mergeType ln rs = do
495-
!rn <- uniqueToRunNumber <$> incrUniqCounter uc
496-
let !caching = diskCachePolicyForLevel confDiskCachePolicy ln
497-
!alloc = bloomFilterAllocForLevel conf ln
498-
!indexType = indexTypeForRun confFencePointerIndex
499-
!runPaths = Paths.runPath activeDir rn
500-
traceWith tr $ AtLevel ln $
501-
TraceNewMerge (V.map Run.size rs) (runNumber runPaths)
502-
caching alloc mergePolicy mergeType
503-
mr <- MR.new hfs hbio resolve caching
504-
alloc indexType mergeType
505-
runPaths rs
506-
let nominalDebt = nominalDebtForLevel conf ln
507-
nominalCredits = NominalCredits 0
508-
nominalCreditsVar <- newPrimVar nominalCredits
509-
assert (MR.totalMergeDebt mr <= maxMergeDebt conf mergePolicy ln) $
510-
return (Merging mergePolicy nominalDebt nominalCreditsVar mr)
430+
newIncomingMergingRun mergePolicy nominalDebt mr = do
431+
nominalCreditsVar <- newPrimVar (NominalCredits 0)
432+
return (Merging mergePolicy nominalDebt nominalCreditsVar mr)
511433

512434
{-# SPECIALISE supplyCreditsIncomingRun ::
513435
TableConfig
@@ -1063,13 +985,9 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
1063985
-> m (IncomingRun m h)
1064986
newMerge mergePolicy mergeType ln rs = do
1065987
ir <- withRollback reg
1066-
(case V.uncons rs of
1067-
Just (r, rest) | V.null rest
1068-
-> newIncomingSingleRun tr ln r
1069-
_ -> newIncomingMergingRun tr hfs hbio
1070-
(Paths.activeDir root) uc
1071-
conf resolve mergePolicy mergeType
1072-
ln rs)
988+
(newIncomingRunAtLevel tr hfs hbio
989+
root uc conf resolve
990+
mergePolicy mergeType ln rs)
1073991
releaseIncomingRun
1074992
-- The runs will end up inside the incoming/merging run, with fresh
1075993
-- references (since newIncoming* will make duplicates).
@@ -1079,6 +997,81 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
1079997
Incremental -> pure ()
1080998
OneShot -> immediatelyCompleteIncomingRun tr conf ln ir
1081999
return ir
1000+
{-# SPECIALISE newIncomingRunAtLevel ::
1001+
Tracer IO (AtLevel MergeTrace)
1002+
-> HasFS IO h
1003+
-> HasBlockIO IO h
1004+
-> SessionRoot
1005+
-> UniqCounter IO
1006+
-> TableConfig
1007+
-> ResolveSerialisedValue
1008+
-> MergePolicyForLevel
1009+
-> MR.LevelMergeType
1010+
-> LevelNo
1011+
-> V.Vector (Ref (Run IO h))
1012+
-> IO (IncomingRun IO h) #-}
1013+
newIncomingRunAtLevel ::
1014+
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
1015+
=> Tracer m (AtLevel MergeTrace)
1016+
-> HasFS m h
1017+
-> HasBlockIO m h
1018+
-> SessionRoot
1019+
-> UniqCounter m
1020+
-> TableConfig
1021+
-> ResolveSerialisedValue
1022+
-> MergePolicyForLevel
1023+
-> MR.LevelMergeType
1024+
-> LevelNo
1025+
-> V.Vector (Ref (Run m h))
1026+
-> m (IncomingRun m h)
1027+
newIncomingRunAtLevel tr hfs hbio
1028+
root uc conf resolve
1029+
mergePolicy mergeType ln rs
1030+
| Just (r, rest) <- V.uncons rs, V.null rest = do
1031+
1032+
traceWith tr $ AtLevel ln $
1033+
TraceNewMergeSingleRun (Run.size r) (Run.runFsPathsNumber r)
1034+
1035+
newIncomingSingleRun r
1036+
1037+
| otherwise = do
1038+
1039+
uniq <- incrUniqCounter uc
1040+
let (caching, alloc, indexType, runPaths) =
1041+
mergingRunParamsForLevel (Paths.activeDir root) conf uniq ln
1042+
1043+
traceWith tr $ AtLevel ln $
1044+
TraceNewMerge (V.map Run.size rs) (runNumber runPaths)
1045+
caching alloc mergePolicy mergeType
1046+
1047+
mr <- MR.new hfs hbio resolve caching
1048+
alloc indexType mergeType
1049+
runPaths rs
1050+
1051+
assert (MR.totalMergeDebt mr <= maxMergeDebt conf mergePolicy ln) $ pure ()
1052+
1053+
let nominalDebt = nominalDebtForLevel conf ln
1054+
newIncomingMergingRun mergePolicy nominalDebt mr
1055+
1056+
mergingRunParamsForLevel ::
1057+
ActiveDir
1058+
-> TableConfig
1059+
-> Unique
1060+
-> LevelNo
1061+
-> (RunDataCaching, RunBloomFilterAlloc, IndexType, RunFsPaths)
1062+
mergingRunParamsForLevel dir
1063+
conf@TableConfig {
1064+
confDiskCachePolicy,
1065+
confFencePointerIndex
1066+
}
1067+
unique ln =
1068+
(caching, alloc, indexType, runPaths)
1069+
where
1070+
!caching = diskCachePolicyForLevel confDiskCachePolicy ln
1071+
!alloc = bloomFilterAllocForLevel conf ln
1072+
!indexType = indexTypeForRun confFencePointerIndex
1073+
!runNum = uniqueToRunNumber unique
1074+
!runPaths = Paths.runPath dir runNum
10821075

10831076
-- | We use levelling on the last level, unless that is also the first level.
10841077
mergePolicyForLevel ::

src/Database/LSMTree/Internal/Snapshot.hs

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import Control.Monad.Class.MonadST (MonadST)
3232
import Control.Monad.Class.MonadThrow (MonadMask, bracketOnError)
3333
import Control.Monad.Primitive (PrimMonad)
3434
import Control.RefCount
35-
import Control.Tracer (Tracer, nullTracer)
3635
import Data.Foldable (sequenceA_, traverse_)
3736
import Data.Text (Text)
3837
import Data.Traversable (for)
@@ -467,8 +466,6 @@ fromSnapLevels reg hfs hbio conf uc resolve dir (SnapLevels levels) =
467466
V.iforM levels $ \i -> fromSnapLevel (LevelNo (i+1))
468467
where
469468
-- TODO: we may wish to trace the merges created during snapshot restore:
470-
tr :: Tracer m (AtLevel MergeTrace)
471-
tr = nullTracer
472469

473470
fromSnapLevel :: LevelNo -> SnapLevel (Ref (Run m h)) -> m (Level m h)
474471
fromSnapLevel ln SnapLevel{snapIncoming, snapResidentRuns} = do
@@ -485,23 +482,40 @@ fromSnapLevels reg hfs hbio conf uc resolve dir (SnapLevels levels) =
485482
LevelNo
486483
-> SnapIncomingRun (Ref (Run m h))
487484
-> m (IncomingRun m h)
488-
fromSnapIncomingRun ln (SnapSingleRun run) =
489-
newIncomingSingleRun tr ln run
490-
491-
fromSnapIncomingRun ln (SnapMergingRun mpfl nr md _nc
492-
(SnapCompletedMerge r)) =
493-
newIncomingCompletedMergingRun tr conf ln mpfl nr md r
494-
495-
fromSnapIncomingRun ln (SnapMergingRun mpfl _nr _md nc
496-
(SnapOngoingMerge rs mt)) = do
497-
bracketOnError (newIncomingMergingRun tr hfs hbio dir uc
498-
conf resolve
499-
mpfl mt ln rs) releaseIncomingRun $ \ir -> do
500-
-- When a snapshot is created, merge progress is lost, so we have to
501-
-- redo merging work here. The MergeCredits in SnapMergingRun tracks
502-
-- how many credits were supplied before the snapshot was taken.
503-
supplyCreditsIncomingRun conf ln ir nc
504-
return ir
485+
fromSnapIncomingRun _ln (SnapSingleRun run) =
486+
newIncomingSingleRun run
487+
488+
fromSnapIncomingRun ln (SnapMergingRun mergePolicy nr mergeDebt _nc
489+
(SnapCompletedMerge r)) = do
490+
mr <- MR.newCompleted nr mergeDebt r
491+
let nominalDebt = nominalDebtForLevel conf ln
492+
nominalCredits = nominalDebtAsCredits nominalDebt
493+
ir <- newIncomingMergingRun mergePolicy nominalDebt mr
494+
-- This will do no real work, since the mr is completed, it'll just
495+
-- set the final nominal credits
496+
supplyCreditsIncomingRun conf ln ir nominalCredits
497+
return ir
498+
499+
fromSnapIncomingRun ln (SnapMergingRun mergePolicy _nr _md nc
500+
(SnapOngoingMerge rs mergeType)) =
501+
bracketOnError
502+
(do uniq <- incrUniqCounter uc
503+
let (caching, alloc, indexType, runPaths) =
504+
mergingRunParamsForLevel dir conf uniq ln
505+
MR.new hfs hbio resolve caching
506+
alloc indexType mergeType
507+
runPaths rs)
508+
releaseRef $ \mr -> do
509+
510+
let nominalDebt = nominalDebtForLevel conf ln
511+
ir <- newIncomingMergingRun mergePolicy nominalDebt mr
512+
513+
-- When a snapshot is created, merge progress is lost, so we have to
514+
-- redo merging work here. The MergeCredits in SnapMergingRun tracks
515+
-- how many credits were supplied before the snapshot was taken.
516+
--TODO: bracketOnError the MR.new for this:
517+
supplyCreditsIncomingRun conf ln ir nc
518+
return ir
505519

506520
{-------------------------------------------------------------------------------
507521
Hard links

0 commit comments

Comments
 (0)