Skip to content

Commit 69b32e6

Browse files
committed
fix Ref handling in addRunToLevels and fromSnapLevels
1 parent e755b82 commit 69b32e6

File tree

4 files changed

+59
-32
lines changed

4 files changed

+59
-32
lines changed

src/Database/LSMTree/Internal.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,6 +1198,7 @@ openSnapshot sesh label tableType override snap resolve = do
11981198
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels
11991199
-- Convert from the snapshot format, restoring merge progress in the process
12001200
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels'
1201+
releaseRuns reg snapLevels'
12011202

12021203
tableCache <- mkLevelsCache reg tableLevels
12031204
newWith reg sesh seshEnv conf' am $! TableContent {

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,13 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
645645
where
646646
-- NOTE: @go@ is based on the @increment@ function from the
647647
-- @ScheduledMerges@ prototype.
648+
--
649+
-- Releases the vector of runs.
650+
go ::
651+
LevelNo
652+
-> V.Vector (Ref (Run m h))
653+
-> V.Vector (Level m h )
654+
-> m (V.Vector (Level m h))
648655
go !ln rs (V.uncons -> Nothing) = do
649656
traceWith tr $ AtLevel ln TraceAddLevel
650657
-- Make a new level
@@ -693,6 +700,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
693700
ir' <- newMerge LevelLevelling Merge.LastLevel ln (rs' `V.snoc` r)
694701
pure $! Level ir' V.empty `V.cons` V.empty
695702

703+
-- Releases the incoming run.
696704
expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
697705
expectCompletedMerge ln ir = do
698706
r <- case ir of
@@ -705,7 +713,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
705713
TraceExpectCompletedMerge (Run.runFsPathsNumber r)
706714
pure r
707715

708-
-- Takes ownership of the runs passed.
716+
-- Releases the runs.
709717
newMerge :: MergePolicyForLevel
710718
-> Merge.Level
711719
-> LevelNo
@@ -717,7 +725,12 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
717725
traceWith tr $ AtLevel ln $
718726
TraceNewMergeSingleRun (Run.size r)
719727
(Run.runFsPathsNumber r)
720-
pure (Single r)
728+
-- We create a fresh reference and release the original one.
729+
-- This will also make it easier to trace back where it was allocated.
730+
ir <- Single <$> allocateTemp reg (dupRef r) releaseRef
731+
freeTemp reg (releaseRef r)
732+
pure ir
733+
721734
| otherwise = do
722735
assert (let l = V.length rs in l >= 2 && l <= 5) $ pure ()
723736
!n <- incrUniqCounter uc
@@ -726,19 +739,12 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
726739
!runPaths = Paths.runPath root (uniqueToRunNumber n)
727740
traceWith tr $ AtLevel ln $
728741
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel
729-
-- TODO: There currently is a resource management bug that happens if an
730-
-- exception occurs after calling MR.new. In this case, all changes roll
731-
-- back, so some of the runs in rs will live in the Levels structure at
732-
-- their original places again. However, we passed their references to
733-
-- the MergingRun, which gets aborted, releasing the run references.
734-
-- Instead of passing the original references into newMerge, we have to
735-
-- duplicate the ones that previously existed in the level and then
736-
-- freeTemp the original ones. This way, on the happy path the result is
737-
-- the same, but if an exception occurs, the original references do not
738-
-- get released.
742+
-- The runs will end up inside the merging run, with fresh references.
743+
-- The original references can be released (but only on the happy path).
739744
mr <- allocateTemp reg
740745
(MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs)
741746
releaseRef
747+
V.forM_ rs $ \r -> freeTemp reg (releaseRef r)
742748
case confMergeSchedule of
743749
Incremental -> pure ()
744750
OneShot -> do

src/Database/LSMTree/Internal/MergingRun.hs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ instance NFData MergeKnownCompleted where
125125
-- | Create a new merging run, returning a reference to it that must ultimately
126126
-- be released via 'releaseRef'.
127127
--
128-
-- Takes over ownership of the references to the runs passed.
128+
-- Duplicates the supplied references to the runs.
129129
--
130130
-- This function should be run with asynchronous exceptions masked to prevent
131131
-- failing after internal resources have already been created.
@@ -141,14 +141,17 @@ new ::
141141
-> RunFsPaths
142142
-> V.Vector (Ref (Run m h))
143143
-> m (Ref (MergingRun m h))
144-
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do
145-
merge <- fromMaybe (error "newMerge: merges can not be empty")
146-
<$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs
147-
let numInputRuns = NumRuns $ V.length runs
148-
let numInputEntries = V.foldMap' Run.size runs
149-
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
150-
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
151-
OngoingMerge runs spentCreditsVar merge
144+
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
145+
-- If creating the Merge fails, we must release the references again.
146+
withTempRegistry $ \reg -> do
147+
runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns
148+
merge <- fromMaybe (error "newMerge: merges can not be empty")
149+
<$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs
150+
let numInputRuns = NumRuns $ V.length runs
151+
let numInputEntries = V.foldMap' Run.size runs
152+
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
153+
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
154+
OngoingMerge runs spentCreditsVar merge
152155

153156
{-# SPECIALISE newCompleted ::
154157
MergePolicyForLevel
@@ -158,16 +161,22 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do
158161
-> IO (Ref (MergingRun IO h)) #-}
159162
-- | Create a merging run that is already in the completed state, returning a
160163
-- reference that must ultimately be released via 'releaseRef'.
164+
--
165+
-- Duplicates the supplied reference to the run.
166+
--
167+
-- This function should be run with asynchronous exceptions masked to prevent
168+
-- failing after internal resources have already been created.
161169
newCompleted ::
162170
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
163171
=> MergePolicyForLevel
164172
-> NumRuns
165173
-> NumEntries
166174
-> Ref (Run m h)
167175
-> m (Ref (MergingRun m h))
168-
newCompleted mergePolicy numInputRuns numInputEntries run = do
169-
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
170-
CompletedMerge run
176+
newCompleted mergePolicy numInputRuns numInputEntries inputRun = do
177+
bracketOnError (dupRef inputRun) releaseRef $ \run ->
178+
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
179+
CompletedMerge run
171180

172181
{-# INLINE unsafeNew #-}
173182
unsafeNew ::

src/Database/LSMTree/Internal/Snapshot.hs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Database.LSMTree.Internal.Snapshot (
1515
-- * Runs
1616
, snapshotRuns
1717
, openRuns
18+
, releaseRuns
1819
-- * Opening from levels snapshot format
1920
, fromSnapLevels
2021
-- * Hard links
@@ -31,7 +32,7 @@ import Control.Monad.Class.MonadThrow (MonadMask)
3132
import Control.Monad.Primitive (PrimMonad)
3233
import Control.RefCount
3334
import Control.TempRegistry
34-
import Data.Foldable (sequenceA_)
35+
import Data.Foldable (sequenceA_, traverse_)
3536
import Data.Primitive.PrimVar
3637
import Data.Text (Text)
3738
import Data.Traversable (for)
@@ -258,6 +259,8 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do
258259
-- into @targetDir@ with new, unique names (using @uniqCounter@). Each set of
259260
-- (hard linked) files that represents a run is opened and verified, returning
260261
-- 'Run's as a result.
262+
--
263+
-- The result must ultimately be released using 'releaseRuns'.
261264
openRuns ::
262265
(MonadMask m, MonadSTM m, MonadST m, MonadMVar m)
263266
=> TempRegistry m
@@ -287,6 +290,14 @@ openRuns
287290
releaseRef
288291
pure (SnapLevels levels')
289292

293+
{-# SPECIALISE releaseRuns ::
294+
TempRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO ()
295+
#-}
296+
releaseRuns ::
297+
(MonadMask m, MonadST m, MonadMVar m)
298+
=> TempRegistry m -> SnapLevels (Ref (Run m h)) -> m ()
299+
releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r)
300+
290301
{-------------------------------------------------------------------------------
291302
Opening from levels snapshot format
292303
-------------------------------------------------------------------------------}
@@ -302,6 +313,7 @@ openRuns
302313
-> SnapLevels (Ref (Run IO h))
303314
-> IO (Levels IO h)
304315
#-}
316+
-- | Duplicates runs and re-creates merging runs.
305317
fromSnapLevels ::
306318
forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
307319
=> TempRegistry m
@@ -321,19 +333,17 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
321333
fromSnapLevel :: LevelNo -> SnapLevel (Ref (Run m h)) -> m (Level m h)
322334
fromSnapLevel ln SnapLevel{..} = do
323335
incomingRun <- fromSnapIncomingRun snapIncoming
324-
pure Level {
325-
incomingRun
326-
, residentRuns = snapResidentRuns
327-
}
336+
residentRuns <- V.mapM dupRun snapResidentRuns
337+
pure Level {incomingRun , residentRuns}
328338
where
329339
caching = diskCachePolicyForLevel confDiskCachePolicy ln
330340
alloc = bloomFilterAllocForLevel conf ln
331341

332342
fromSnapIncomingRun ::
333343
SnapIncomingRun (Ref (Run m h))
334344
-> m (IncomingRun m h)
335-
fromSnapIncomingRun (SnapSingleRun run) =
336-
pure (Single run)
345+
fromSnapIncomingRun (SnapSingleRun run) = do
346+
Single <$> dupRun run
337347
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do
338348
Merging <$> case smrs of
339349
SnapCompletedMerge run ->
@@ -344,7 +354,6 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
344354
mr <- allocateTemp reg
345355
(MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs)
346356
releaseRef
347-
348357
-- When a snapshot is created, merge progress is lost, so we
349358
-- have to redo merging work here. UnspentCredits and
350359
-- SpentCredits track how many credits were supplied before the
@@ -354,6 +363,8 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
354363
MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr
355364
return mr
356365

366+
dupRun r = allocateTemp reg (dupRef r) releaseRef
367+
357368
{-------------------------------------------------------------------------------
358369
Hard links
359370
-------------------------------------------------------------------------------}

0 commit comments

Comments
 (0)