Skip to content

Commit c182515

Browse files
committed
Change representation for snapshot of merging run
The goal is to make the representation of the snapshot of the merging run closer to being sufficient information to reconstruct the merging run upon snapshot restore. Previously we only embed the merging run within an incoming run and we use information from that context to reconstruct the merging run. In future we want to reuse the representation of snapshots of merging runs within merging trees, and those do not contain the same context information as an incoming run does. So to enable reuse, it makes sense to make the merging run snapshot representation more standalone. In particular: * SnapIncomingRun no longer contains NumRuns or MergeDebt, and these get moved down to the SnapMergingRunState. * SnapIncomingRun now stores the NominalDebt * SnapMergingRunState also gains the RunParams and MergeCredits
1 parent 87a2aac commit c182515

File tree

10 files changed

+290
-108
lines changed

10 files changed

+290
-108
lines changed

src/Database/LSMTree/Internal/Config.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import qualified Monkey
4646

4747
newtype LevelNo = LevelNo Int
4848
deriving stock (Show, Eq)
49-
deriving newtype Enum
49+
deriving newtype (Enum, NFData)
5050

5151
{-------------------------------------------------------------------------------
5252
Table configuration

src/Database/LSMTree/Internal/Index.hs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ import Database.LSMTree.Internal.Serialise (SerialisedKey)
7272
data IndexType = Compact | Ordinary
7373
deriving stock (Eq, Show)
7474

75+
instance NFData IndexType where
76+
rnf Compact = ()
77+
rnf Ordinary = ()
78+
7579
-- * Indexes
7680

7781
-- | The type of supported indexes.
@@ -81,7 +85,6 @@ data Index
8185
deriving stock (Eq, Show)
8286

8387
instance NFData Index where
84-
8588
rnf (CompactIndex index) = rnf index
8689
rnf (OrdinaryIndex index) = rnf index
8790

src/Database/LSMTree/Internal/Merge.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module Database.LSMTree.Internal.Merge (
1717
, stepsToCompletionCounted
1818
, StepResult (..)
1919
, steps
20+
, mergeRunParams
2021
) where
2122

2223
import Control.DeepSeq (NFData (..))
@@ -66,6 +67,9 @@ data Merge t m h = Merge {
6667
, mergeHasBlockIO :: !(HasBlockIO m h)
6768
}
6869

70+
mergeRunParams :: Merge t m h -> RunParams
71+
mergeRunParams = Builder.runBuilderParams . mergeBuilder
72+
6973
-- | The current state of the merge.
7074
data MergeState =
7175
-- | There is still merging work to be done

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ import Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
7272
import Database.LSMTree.Internal.Index (Index)
7373
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
7474
import Database.LSMTree.Internal.MergingRun (MergeCredits (..),
75-
MergeDebt (..), MergingRun, NumRuns (..), RunParams (..))
75+
MergeDebt (..), MergingRun, RunParams (..))
7676
import qualified Database.LSMTree.Internal.MergingRun as MR
7777
import Database.LSMTree.Internal.MergingTree (MergingTree)
7878
import Database.LSMTree.Internal.Paths (ActiveDir, RunFsPaths (..),
@@ -379,6 +379,7 @@ instance NFData MergePolicyForLevel where
379379
-- complete.
380380
newtype NominalDebt = NominalDebt Int
381381
deriving stock Eq
382+
deriving newtype (NFData)
382383

383384
-- | Merge credits that get supplied to a table's levels.
384385
--
@@ -611,31 +612,21 @@ immediatelyCompleteIncomingRun tr conf ln ir =
611612
IncomingRun IO h
612613
-> IO (Either (Ref (Run IO h))
613614
(MergePolicyForLevel,
614-
NumRuns,
615615
NominalDebt,
616616
NominalCredits,
617-
MergeDebt,
618-
MergeCredits,
619-
MR.MergingRunState MR.LevelMergeType IO h)) #-}
617+
Ref (MergingRun MR.LevelMergeType IO h))) #-}
620618
snapshotIncomingRun ::
621-
(PrimMonad m, MonadMVar m)
619+
PrimMonad m
622620
=> IncomingRun m h
623621
-> m (Either (Ref (Run m h))
624622
(MergePolicyForLevel,
625-
NumRuns,
626623
NominalDebt,
627624
NominalCredits,
628-
MergeDebt,
629-
MergeCredits,
630-
MR.MergingRunState MR.LevelMergeType m h))
625+
Ref (MergingRun MR.LevelMergeType m h)))
631626
snapshotIncomingRun (Single r) = pure (Left r)
632627
snapshotIncomingRun (Merging mergePolicy nominalDebt nominalCreditsVar mr) = do
633-
(numRuns, mergeDebt, mergeCredit, state) <- MR.snapshot mr
634628
nominalCredits <- readPrimVar nominalCreditsVar
635-
pure (Right (mergePolicy, numRuns,
636-
nominalDebt, nominalCredits,
637-
mergeDebt, mergeCredit,
638-
state))
629+
pure (Right (mergePolicy, nominalDebt, nominalCredits, mr))
639630

640631
{-------------------------------------------------------------------------------
641632
Union level

src/Database/LSMTree/Internal/RunAcc.hs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ module Database.LSMTree.Internal.RunAcc (
3131
, PageAcc.entryWouldFitInPage
3232
) where
3333

34+
import Control.DeepSeq (NFData (..))
3435
import Control.Exception (assert)
3536
import Control.Monad.ST.Strict
3637
import Data.BloomFilter (Bloom, MBloom)
@@ -78,12 +79,17 @@ data RunAcc s = RunAcc {
7879
-- | See 'Database.LSMTree.Internal.BloomFilterAlloc'
7980
data RunBloomFilterAlloc =
8081
-- | Bits per element in a filter
81-
RunAllocFixed Word64
82-
| RunAllocRequestFPR Double
82+
RunAllocFixed !Word64
83+
| RunAllocRequestFPR !Double
8384
-- | Total number of bits for a filter
84-
| RunAllocMonkey Word64
85+
| RunAllocMonkey !Word64
8586
deriving stock (Show, Eq)
8687

88+
instance NFData RunBloomFilterAlloc where
89+
rnf (RunAllocFixed a) = rnf a
90+
rnf (RunAllocRequestFPR a) = rnf a
91+
rnf (RunAllocMonkey a) = rnf a
92+
8793
-- | @'new' nentries@ starts an incremental run construction.
8894
--
8995
-- @nentries@ should be an upper bound on the expected number of entries in the

src/Database/LSMTree/Internal/RunBuilder.hs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ data RunParams = RunParams {
7878
runParamAlloc :: !RunBloomFilterAlloc,
7979
runParamIndex :: !IndexType
8080
}
81-
deriving stock Show
81+
deriving stock (Eq, Show)
82+
83+
instance NFData RunParams where
84+
rnf (RunParams a b c) = rnf a `seq` rnf b `seq` rnf c
8285

8386
-- | Should this run cache key\/ops data in memory?
8487
data RunDataCaching = CacheRunData | NoCacheRunData

src/Database/LSMTree/Internal/Snapshot.hs

Lines changed: 77 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ import Database.LSMTree.Internal.Paths (ActiveDir (..), ForBlob (..),
4848
ForKOps (..), NamedSnapshotDir (..), RunFsPaths (..),
4949
WriteBufferFsPaths (..),
5050
fromChecksumsFileForWriteBufferFiles, pathsForRunFiles,
51-
runChecksumsPath, writeBufferBlobPath,
51+
runChecksumsPath, runPath, writeBufferBlobPath,
5252
writeBufferChecksumsPath, writeBufferKOpsPath)
53-
import Database.LSMTree.Internal.Run (Run)
53+
import Database.LSMTree.Internal.Run (Run, RunParams)
5454
import qualified Database.LSMTree.Internal.Run as Run
5555
import Database.LSMTree.Internal.RunNumber
5656
import Database.LSMTree.Internal.UniqCounter (UniqCounter,
@@ -156,17 +156,16 @@ instance NFData r => NFData (SnapLevel r) where
156156
--
157157
data SnapIncomingRun r =
158158
SnapMergingRun !MergePolicyForLevel
159-
!NumRuns
160-
!MergeDebt -- ^ The total merge debt.
159+
!NominalDebt
161160
!NominalCredits -- ^ The nominal credits supplied, and that
162161
-- need to be supplied on snapshot open.
163162
!(SnapMergingRunState MR.LevelMergeType r)
164163
| SnapSingleRun !r
165164
deriving stock (Eq, Functor, Foldable, Traversable)
166165

167166
instance NFData r => NFData (SnapIncomingRun r) where
168-
rnf (SnapMergingRun a b c d e) =
169-
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e
167+
rnf (SnapMergingRun a b c d) =
168+
rnf a `seq` rnf b `seq` rnf c `seq` rnf d
170169
rnf (SnapSingleRun a) = rnf a
171170

172171
-- | The total number of supplied credits. This total is used on snapshot load
@@ -176,13 +175,13 @@ newtype SuppliedCredits = SuppliedCredits { getSuppliedCredits :: Int }
176175
deriving newtype NFData
177176

178177
data SnapMergingRunState t r =
179-
SnapCompletedMerge !r
180-
| SnapOngoingMerge !(V.Vector r) !t
178+
SnapCompletedMerge !NumRuns !MergeDebt !r
179+
| SnapOngoingMerge !RunParams !MergeCredits !(V.Vector r) !t
181180
deriving stock (Eq, Functor, Foldable, Traversable)
182181

183182
instance (NFData t, NFData r) => NFData (SnapMergingRunState t r) where
184-
rnf (SnapCompletedMerge a) = rnf a
185-
rnf (SnapOngoingMerge a b) = rnf a `seq` rnf b
183+
rnf (SnapCompletedMerge a b c) = rnf a `seq` rnf b `seq` rnf c
184+
rnf (SnapOngoingMerge a b c d) = rnf a `seq` rnf b `seq` rnf c `seq` rnf d
186185

187186
{-------------------------------------------------------------------------------
188187
Conversion to levels snapshot format
@@ -220,31 +219,34 @@ toSnapIncomingRun ir = do
220219
case s of
221220
Left r -> pure $! SnapSingleRun r
222221
Right (mergePolicy,
223-
numRuns,
224-
_nominalDebt, -- not stored
222+
nominalDebt,
225223
nominalCredits,
226-
mergeDebt,
227-
_mergeCredits, -- not stored
228-
mergingRunState) -> do
224+
mergingRun) -> do
229225
-- We need to know how many credits were supplied so we can restore merge
230226
-- work on snapshot load.
231-
-- TODO: MR.snapshot needs to return duplicated run references, and we
232-
-- need to arrange to release them when the snapshoting is done.
233-
let smrs = toSnapMergingRunState mergingRunState
234-
pure $!
235-
SnapMergingRun
236-
mergePolicy
237-
numRuns
238-
mergeDebt
239-
nominalCredits
240-
smrs
227+
smrs <- toSnapMergingRunState mergingRun
228+
pure $! SnapMergingRun mergePolicy nominalDebt nominalCredits smrs
241229

230+
{-# SPECIALISE toSnapMergingRunState ::
231+
Ref (MR.MergingRun t IO h)
232+
-> IO (SnapMergingRunState t (Ref (Run IO h))) #-}
242233
toSnapMergingRunState ::
243-
MR.MergingRunState t m h
244-
-> SnapMergingRunState t (Ref (Run m h))
245-
toSnapMergingRunState = \case
246-
MR.CompletedMerge r -> SnapCompletedMerge r
247-
MR.OngoingMerge rs m -> SnapOngoingMerge rs (Merge.mergeType m)
234+
(PrimMonad m, MonadMVar m)
235+
=> Ref (MR.MergingRun t m h)
236+
-> m (SnapMergingRunState t (Ref (Run m h)))
237+
toSnapMergingRunState !mr = do
238+
-- TODO: MR.snapshot needs to return duplicated run references, and we
239+
-- need to arrange to release them when the snapshoting is done.
240+
(numRuns, mergeDebt, mergeCredits, state) <- MR.snapshot mr
241+
case state of
242+
MR.CompletedMerge r ->
243+
pure $! SnapCompletedMerge numRuns mergeDebt r
244+
245+
MR.OngoingMerge rs m ->
246+
pure $! SnapOngoingMerge runParams mergeCredits rs mergeType
247+
where
248+
runParams = Merge.mergeRunParams m
249+
mergeType = Merge.mergeType m
248250

249251
{-------------------------------------------------------------------------------
250252
Write Buffer
@@ -485,35 +487,55 @@ fromSnapLevels reg hfs hbio conf uc resolve dir (SnapLevels levels) =
485487
fromSnapIncomingRun _ln (SnapSingleRun run) =
486488
newIncomingSingleRun run
487489

488-
fromSnapIncomingRun ln (SnapMergingRun mergePolicy nr mergeDebt _nc
489-
(SnapCompletedMerge r)) = do
490-
mr <- MR.newCompleted nr mergeDebt r
491-
let nominalDebt = nominalDebtForLevel conf ln
492-
nominalCredits = nominalDebtAsCredits nominalDebt
490+
fromSnapIncomingRun ln (SnapMergingRun mergePolicy nominalDebt
491+
nominalCredits smrs) = do
492+
mr <- fromSnapMergingRunState hfs hbio uc resolve dir smrs
493493
ir <- newIncomingMergingRun mergePolicy nominalDebt mr
494-
-- This will do no real work, since the mr is completed, it'll just
495-
-- set the final nominal credits
494+
-- This will set the correct nominal credits, but it will not do any more
495+
-- merging work because fromSnapMergingRunState already supplies all the
496+
-- merging credits already.
496497
supplyCreditsIncomingRun conf ln ir nominalCredits
497498
return ir
498499

499-
fromSnapIncomingRun ln (SnapMergingRun mergePolicy _nr _md nc
500-
(SnapOngoingMerge rs mergeType)) =
501-
bracketOnError
502-
(do uniq <- incrUniqCounter uc
503-
let (runParams, runPaths) =
504-
mergingRunParamsForLevel dir conf uniq ln
505-
MR.new hfs hbio resolve runParams mergeType runPaths rs)
506-
releaseRef $ \mr -> do
507-
508-
let nominalDebt = nominalDebtForLevel conf ln
509-
ir <- newIncomingMergingRun mergePolicy nominalDebt mr
510-
511-
-- When a snapshot is created, merge progress is lost, so we have to
512-
-- redo merging work here. The MergeCredits in SnapMergingRun tracks
513-
-- how many credits were supplied before the snapshot was taken.
514-
--TODO: bracketOnError the MR.new for this:
515-
supplyCreditsIncomingRun conf ln ir nc
516-
return ir
500+
{-# SPECIALISE fromSnapMergingRunState ::
501+
MR.IsMergeType t
502+
=> HasFS IO h
503+
-> HasBlockIO IO h
504+
-> UniqCounter IO
505+
-> ResolveSerialisedValue
506+
-> ActiveDir
507+
-> SnapMergingRunState t (Ref (Run IO h))
508+
-> IO (Ref (MR.MergingRun t IO h)) #-}
509+
fromSnapMergingRunState ::
510+
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m, MR.IsMergeType t)
511+
=> HasFS m h
512+
-> HasBlockIO m h
513+
-> UniqCounter m
514+
-> ResolveSerialisedValue
515+
-> ActiveDir
516+
-> SnapMergingRunState t (Ref (Run m h))
517+
-> m (Ref (MR.MergingRun t m h))
518+
fromSnapMergingRunState _hfs _hbio _uc _resolve _dir
519+
(SnapCompletedMerge numRuns mergeDebt r) =
520+
MR.newCompleted numRuns mergeDebt r
521+
522+
fromSnapMergingRunState hfs hbio uc resolve dir
523+
(SnapOngoingMerge runParams mergeCredits
524+
rs mergeType) = do
525+
bracketOnError
526+
(do uniq <- incrUniqCounter uc
527+
let runPaths = runPath dir (uniqueToRunNumber uniq)
528+
MR.new hfs hbio resolve runParams mergeType runPaths rs)
529+
releaseRef $ \mr -> do
530+
-- When a snapshot is created, merge progress is lost, so we have to
531+
-- redo merging work here. The MergeCredits in SnapMergingRun tracks
532+
-- how many credits were supplied before the snapshot was taken.
533+
534+
--TODO: the threshold should be stored with the MergingRun
535+
-- here we want to supply the credits now, so we can use a threshold of 1
536+
let thresh = MR.CreditThreshold (MR.UnspentCredits 1)
537+
_ <- MR.supplyCreditsAbsolute mr thresh mergeCredits
538+
return mr
517539

518540
{-------------------------------------------------------------------------------
519541
Hard links

0 commit comments

Comments
 (0)