Skip to content

Commit e755b82

Browse files
committed
create MergingRun through safe interface
Initialising a regular MergingRun even for OneShot merging avoid the the need for directly creating completed MergingRuns and also re-uses the existing code path to step and complete a Merge, which means there is one less place to think about resource safety. To be able to assert that the OneShot merge really completed, as well as access the resulting run, we make expectCompleted not release the MergingRun. Similarly, we restore a completed MergingRun snapshot through a safe interface. We also remove MergeKnownCompleted from the snapshot format, which previously allowed to represent inconsistent states. It can instead be re-constructed from the MergingRunState. We also don't pass the TempRegistry across the module boundary any more. The module can use its own where necessary.
1 parent 67a96bf commit e755b82

File tree

5 files changed

+149
-147
lines changed

5 files changed

+149
-147
lines changed

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ module Database.LSMTree.Internal.MergeSchedule (
2828
) where
2929

3030
import Control.Concurrent.Class.MonadMVar.Strict
31-
import Control.Monad ((<$!>))
3231
import Control.Monad.Class.MonadST (MonadST)
3332
import Control.Monad.Class.MonadSTM (MonadSTM (..))
3433
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..))
@@ -38,7 +37,6 @@ import Control.TempRegistry
3837
import Control.Tracer
3938
import Data.BloomFilter (Bloom)
4039
import Data.Foldable (fold)
41-
import Data.Primitive.PrimVar
4240
import qualified Data.Vector as V
4341
import Database.LSMTree.Internal.Assertions (assert)
4442
import Database.LSMTree.Internal.Config
@@ -93,9 +91,10 @@ data MergeTrace =
9391
RunBloomFilterAlloc
9492
MergePolicyForLevel
9593
Merge.Level
96-
| TraceCompletedMerge
94+
| TraceCompletedMerge -- TODO: currently not traced for Incremental merges
9795
NumEntries -- ^ Size of output run
9896
RunNumber
97+
-- | This is traced at the latest point the merge could complete.
9998
| TraceExpectCompletedMerge
10099
RunNumber
101100
| TraceNewMergeSingleRun
@@ -181,7 +180,7 @@ mkLevelsCache ::
181180
mkLevelsCache reg lvls = do
182181
rs <- foldRunAndMergeM
183182
(fmap V.singleton . dupRun)
184-
(MR.duplicateRuns reg)
183+
(\mr -> allocateTemp reg (MR.duplicateRuns mr) (V.mapM_ releaseRef))
185184
lvls
186185
pure $! LevelsCache_ {
187186
cachedRuns = rs
@@ -698,18 +697,21 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
698697
expectCompletedMerge ln ir = do
699698
r <- case ir of
700699
Single r -> pure r
701-
Merging mr -> MR.expectCompleted reg mr
700+
Merging mr -> do
701+
r <- allocateTemp reg (MR.expectCompleted mr) releaseRef
702+
freeTemp reg (releaseRef mr)
703+
pure r
702704
traceWith tr $ AtLevel ln $
703705
TraceExpectCompletedMerge (Run.runFsPathsNumber r)
704706
pure r
705707

706-
-- TODO: refactor, pull to top level?
708+
-- Takes ownership of the runs passed.
707709
newMerge :: MergePolicyForLevel
708710
-> Merge.Level
709711
-> LevelNo
710712
-> V.Vector (Ref (Run m h))
711713
-> m (IncomingRun m h)
712-
newMerge mergePolicy mergelast ln rs
714+
newMerge mergePolicy mergeLevel ln rs
713715
| Just (r, rest) <- V.uncons rs
714716
, V.null rest = do
715717
traceWith tr $ AtLevel ln $
@@ -723,29 +725,33 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
723725
!alloc = bloomFilterAllocForLevel conf ln
724726
!runPaths = Paths.runPath root (uniqueToRunNumber n)
725727
traceWith tr $ AtLevel ln $
726-
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergelast
727-
let numInputRuns = NumRuns $ V.length rs
728-
let numInputEntries = V.foldMap' Run.size rs
728+
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel
729+
-- TODO: There currently is a resource management bug that happens if an
730+
-- exception occurs after calling MR.new. In this case, all changes roll
731+
-- back, so some of the runs in rs will live in the Levels structure at
732+
-- their original places again. However, we passed their references to
733+
-- the MergingRun, which gets aborted, releasing the run references.
734+
-- Instead of passing the original references into newMerge, we have to
735+
-- duplicate the ones that previously existed in the level and then
736+
-- freeTemp the original ones. This way, on the happy path the result is
737+
-- the same, but if an exception occurs, the original references do not
738+
-- get released.
739+
mr <- allocateTemp reg
740+
(MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs)
741+
releaseRef
729742
case confMergeSchedule of
743+
Incremental -> pure ()
730744
OneShot -> do
731-
r <- allocateTemp reg
732-
(mergeRuns resolve hfs hbio caching alloc runPaths mergelast rs)
733-
releaseRef
734-
traceWith tr $ AtLevel ln $
735-
TraceCompletedMerge (Run.size r)
736-
(Run.runFsPathsNumber r)
737-
V.mapM_ (freeTemp reg . releaseRef) rs
738-
Merging <$!> MR.unsafeNew mergePolicy numInputRuns numInputEntries MR.MergeKnownCompleted (MR.CompletedMerge r)
739-
740-
Incremental -> do
741-
mergeMaybe <- allocateMaybeTemp reg
742-
(Merge.new hfs hbio caching alloc mergelast resolve runPaths rs)
743-
Merge.abort
744-
case mergeMaybe of
745-
Nothing -> error "newMerge: merges can not be empty"
746-
Just m -> do
747-
spentCreditsVar <- MR.SpentCreditsVar <$> newPrimVar 0
748-
Merging <$!> MR.unsafeNew mergePolicy numInputRuns numInputEntries MR.MergeMaybeCompleted (MR.OngoingMerge rs spentCreditsVar m)
745+
let !required = MR.Credits (unNumEntries (V.foldMap' Run.size rs))
746+
let !thresh = creditThresholdForLevel conf ln
747+
MR.supplyCredits required thresh mr
748+
-- This ensures the merge is really completed. However, we don't
749+
-- release the merge yet and only briefly inspect the resulting run.
750+
bracket (MR.expectCompleted mr) releaseRef $ \r ->
751+
traceWith tr $ AtLevel ln $
752+
TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r)
753+
754+
return (Merging mr)
749755

750756
-- $setup
751757
-- >>> import Database.LSMTree.Internal.Entry
@@ -795,23 +801,6 @@ mergeLastForLevel levels
795801
levelIsFull :: SizeRatio -> V.Vector run -> Bool
796802
levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr)
797803

798-
{-# SPECIALISE mergeRuns :: ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunBloomFilterAlloc -> RunFsPaths -> Merge.Level -> V.Vector (Ref (Run IO h)) -> IO (Ref (Run IO h)) #-}
799-
mergeRuns ::
800-
(MonadMask m, MonadST m, MonadSTM m)
801-
=> ResolveSerialisedValue
802-
-> HasFS m h
803-
-> HasBlockIO m h
804-
-> RunDataCaching
805-
-> RunBloomFilterAlloc
806-
-> RunFsPaths
807-
-> Merge.Level
808-
-> V.Vector (Ref (Run m h))
809-
-> m (Ref (Run m h))
810-
mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do
811-
Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs >>= \case
812-
Nothing -> error "mergeRuns: no inputs"
813-
Just m -> Merge.stepsToCompletion m 1024
814-
815804
{-------------------------------------------------------------------------------
816805
Credits
817806
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal/MergingRun.hs

Lines changed: 85 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
-- | An incremental merge of multiple runs.
66
module Database.LSMTree.Internal.MergingRun (
77
MergingRun (..)
8-
, unsafeNew
8+
, new
9+
, newCompleted
910
, duplicateRuns
1011
, supplyCredits
1112
, expectCompleted
@@ -32,14 +33,21 @@ import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError),
3233
import Control.Monad.Primitive
3334
import Control.RefCount
3435
import Control.TempRegistry
36+
import Data.Maybe (fromMaybe)
3537
import Data.Primitive.MutVar
3638
import Data.Primitive.PrimVar
3739
import qualified Data.Vector as V
3840
import Database.LSMTree.Internal.Assertions (assert)
3941
import Database.LSMTree.Internal.Entry (NumEntries (..), unNumEntries)
42+
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
4043
import Database.LSMTree.Internal.Merge (Merge, StepResult (..))
4144
import qualified Database.LSMTree.Internal.Merge as Merge
45+
import Database.LSMTree.Internal.Paths (RunFsPaths (..))
4246
import Database.LSMTree.Internal.Run (Run)
47+
import qualified Database.LSMTree.Internal.Run as Run
48+
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc)
49+
import System.FS.API (HasFS)
50+
import System.FS.BlockIO.API (HasBlockIO)
4351

4452
data MergingRun m h = MergingRun {
4553
mergePolicy :: !MergePolicyForLevel
@@ -103,16 +111,65 @@ instance NFData MergeKnownCompleted where
103111
rnf MergeKnownCompleted = ()
104112
rnf MergeMaybeCompleted = ()
105113

106-
{-# SPECIALISE unsafeNew ::
114+
{-# SPECIALISE new ::
115+
HasFS IO h
116+
-> HasBlockIO IO h
117+
-> ResolveSerialisedValue
118+
-> Run.RunDataCaching
119+
-> RunBloomFilterAlloc
120+
-> Merge.Level
121+
-> MergePolicyForLevel
122+
-> RunFsPaths
123+
-> V.Vector (Ref (Run IO h))
124+
-> IO (Ref (MergingRun IO h)) #-}
125+
-- | Create a new merging run, returning a reference to it that must ultimately
126+
-- be released via 'releaseRef'.
127+
--
128+
-- Takes over ownership of the references to the runs passed.
129+
--
130+
-- This function should be run with asynchronous exceptions masked to prevent
131+
-- failing after internal resources have already been created.
132+
new ::
133+
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
134+
=> HasFS m h
135+
-> HasBlockIO m h
136+
-> ResolveSerialisedValue
137+
-> Run.RunDataCaching
138+
-> RunBloomFilterAlloc
139+
-> Merge.Level
140+
-> MergePolicyForLevel
141+
-> RunFsPaths
142+
-> V.Vector (Ref (Run m h))
143+
-> m (Ref (MergingRun m h))
144+
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do
145+
merge <- fromMaybe (error "newMerge: merges can not be empty")
146+
<$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs
147+
let numInputRuns = NumRuns $ V.length runs
148+
let numInputEntries = V.foldMap' Run.size runs
149+
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
150+
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
151+
OngoingMerge runs spentCreditsVar merge
152+
153+
{-# SPECIALISE newCompleted ::
107154
MergePolicyForLevel
108155
-> NumRuns
109156
-> NumEntries
110-
-> MergeKnownCompleted
111-
-> MergingRunState IO h
157+
-> Ref (Run IO h)
112158
-> IO (Ref (MergingRun IO h)) #-}
113-
-- | This allows constructing ill-formed MergingRuns, but the flexibility is
114-
-- needed for creating a merging run that is already Completed, as well as
115-
-- opening a merging run from a snapshot.
159+
-- | Create a merging run that is already in the completed state, returning a
160+
-- reference that must ultimately be released via 'releaseRef'.
161+
newCompleted ::
162+
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
163+
=> MergePolicyForLevel
164+
-> NumRuns
165+
-> NumEntries
166+
-> Ref (Run m h)
167+
-> m (Ref (MergingRun m h))
168+
newCompleted mergePolicy numInputRuns numInputEntries run = do
169+
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
170+
CompletedMerge run
171+
172+
{-# INLINE unsafeNew #-}
116173
unsafeNew ::
117174
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
118175
=> MergePolicyForLevel
@@ -150,20 +207,18 @@ unsafeNew mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do
150207

151208
-- | Create references to the runs that should be queried for lookups.
152209
-- In particular, if the merge is not complete, these are the input runs.
153-
{-# SPECIALISE duplicateRuns :: TempRegistry IO -> Ref (MergingRun IO h) -> IO (V.Vector (Ref (Run IO h))) #-}
210+
{-# SPECIALISE duplicateRuns :: Ref (MergingRun IO h) -> IO (V.Vector (Ref (Run IO h))) #-}
154211
duplicateRuns ::
155212
(PrimMonad m, MonadMVar m, MonadMask m)
156-
=> TempRegistry m
157-
-> Ref (MergingRun m h)
213+
=> Ref (MergingRun m h)
158214
-> m (V.Vector (Ref (Run m h)))
159-
duplicateRuns reg (DeRef mr) =
215+
duplicateRuns (DeRef mr) =
160216
-- We take the references while holding the MVar to make sure the MergingRun
161217
-- does not get completed concurrently before we are done.
162218
withMVar (mergeState mr) $ \case
163-
CompletedMerge r -> V.singleton <$> dupRun r
164-
OngoingMerge rs _ _ -> V.mapM dupRun rs
165-
where
166-
dupRun r = allocateTemp reg (dupRef r) releaseRef
219+
CompletedMerge r -> V.singleton <$> dupRef r
220+
OngoingMerge rs _ _ -> withTempRegistry $ \reg ->
221+
V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) rs
167222

168223
{-------------------------------------------------------------------------------
169224
Credits
@@ -331,19 +386,19 @@ tryTakeUnspentCredits ::
331386
tryTakeUnspentCredits
332387
unspentCreditsVar@(UnspentCreditsVar !var)
333388
thresh@(CreditThreshold !creditsThresh)
334-
(Credits !prev)
335-
| prev < creditsThresh = pure Nothing
389+
(Credits !before)
390+
| before < creditsThresh = pure Nothing
336391
| otherwise = do
337392
-- numThresholds is guaranteed to be >= 1
338-
let !numThresholds = prev `div` creditsThresh
393+
let !numThresholds = before `div` creditsThresh
339394
!creditsToTake = numThresholds * creditsThresh
340-
!new = prev - creditsToTake
341-
assert (new < creditsThresh) $ pure ()
342-
prev' <- casInt var prev new
343-
if prev' == prev then
395+
!after = before - creditsToTake
396+
assert (after < creditsThresh) $ pure ()
397+
before' <- casInt var before after
398+
if before' == before then
344399
pure (Just (Credits creditsToTake))
345400
else
346-
tryTakeUnspentCredits unspentCreditsVar thresh (Credits prev')
401+
tryTakeUnspentCredits unspentCreditsVar thresh (Credits before')
347402

348403
{-# SPECIALISE putBackUnspentCredits ::
349404
UnspentCreditsVar RealWorld
@@ -446,13 +501,14 @@ completeMerge mergeVar mergeKnownCompletedVar = do
446501
pure $! CompletedMerge r
447502

448503
{-# SPECIALISE expectCompleted ::
449-
TempRegistry IO
450-
-> Ref (MergingRun IO h)
504+
Ref (MergingRun IO h)
451505
-> IO (Ref (Run IO h)) #-}
506+
-- | This does /not/ release the reference, but allocates a new reference for
507+
-- the returned run, which must be released at some point.
452508
expectCompleted ::
453509
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m)
454-
=> TempRegistry m -> Ref (MergingRun m h) -> m (Ref (Run m h))
455-
expectCompleted reg mr@(DeRef MergingRun {..}) = do
510+
=> Ref (MergingRun m h) -> m (Ref (Run m h))
511+
expectCompleted (DeRef MergingRun {..}) = do
456512
knownCompleted <- readMutVar mergeKnownCompleted
457513
-- The merge is not guaranteed to be complete, so we do the remaining steps
458514
when (knownCompleted == MergeMaybeCompleted) $ do
@@ -462,13 +518,9 @@ expectCompleted reg mr@(DeRef MergingRun {..}) = do
462518
when isMergeDone $ completeMerge mergeState mergeKnownCompleted
463519
-- TODO: can we think of a check to see if we did not do too much work
464520
-- here?
465-
r <- withMVar mergeState $ \case
466-
CompletedMerge r -> pure r
521+
withMVar mergeState $ \case
522+
CompletedMerge r -> dupRef r -- return a fresh reference to the run
467523
OngoingMerge{} -> do
468524
-- If the algorithm finds an ongoing merge here, then it is a bug in
469525
-- our merge sceduling algorithm. As such, we throw a pure error.
470526
error "expectCompleted: expected a completed merge, but found an ongoing merge"
471-
-- return a fresh reference to the run
472-
r' <- allocateTemp reg (dupRef r) releaseRef
473-
freeTemp reg (releaseRef mr)
474-
pure r'

0 commit comments

Comments
 (0)