@@ -28,6 +28,7 @@ module Database.LSMTree.Internal.MergeSchedule (
2828 , newIncomingSingleRun
2929 , newIncomingCompletedMergingRun
3030 , newIncomingMergingRun
31+ , releaseIncomingRun
3132 , supplyCreditsIncomingRun
3233 , snapshotIncomingRun
3334 -- * Union level
@@ -349,7 +350,7 @@ releaseLevels ::
349350 -> m ()
350351releaseLevels reg levels =
351352 V. forM_ levels $ \ Level {incomingRun, residentRuns} -> do
352- releaseIncomingRun reg incomingRun
353+ delayedCommit reg (releaseIncomingRun incomingRun)
353354 V. mapM_ (delayedCommit reg . releaseRef) residentRuns
354355
355356{-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-}
@@ -406,34 +407,33 @@ duplicateIncomingRun reg (Merging mp md mcv mr) =
406407 Merging mp md <$> (newPrimVar =<< readPrimVar mcv)
407408 <*> withRollback reg (dupRef mr) releaseRef
408409
409- {-# SPECIALISE releaseIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO () #-}
410+ {-# SPECIALISE releaseIncomingRun :: IncomingRun IO h -> IO () #-}
410411releaseIncomingRun ::
411412 (PrimMonad m , MonadMask m )
412- => ActionRegistry m
413- -> IncomingRun m h -> m ()
414- releaseIncomingRun reg (Single r) = delayedCommit reg (releaseRef r)
415- releaseIncomingRun reg (Merging _ _ _ mr) = delayedCommit reg (releaseRef mr)
413+ => IncomingRun m h -> m ()
414+ releaseIncomingRun (Single r) = releaseRef r
415+ releaseIncomingRun (Merging _ _ _ mr) = releaseRef mr
416416
417417{-# SPECIALISE newIncomingSingleRun ::
418418 Tracer IO (AtLevel MergeTrace)
419419 -> LevelNo
420420 -> Ref (Run IO h)
421421 -> IO (IncomingRun IO h) #-}
422422newIncomingSingleRun ::
423- Monad m
423+ PrimMonad m
424424 => Tracer m (AtLevel MergeTrace )
425425 -> LevelNo
426426 -> Ref (Run m h )
427427 -> m (IncomingRun m h )
428428newIncomingSingleRun tr ln r = do
429+ r' <- dupRef r
429430 traceWith tr $ AtLevel ln $
430- TraceNewMergeSingleRun (Run. size r) (Run. runFsPathsNumber r)
431- return (Single r)
431+ TraceNewMergeSingleRun (Run. size r' ) (Run. runFsPathsNumber r' )
432+ return (Single r' )
432433
433434{-# SPECIALISE newIncomingCompletedMergingRun ::
434435 Tracer IO (AtLevel MergeTrace)
435436 -> TableConfig
436- -> ActionRegistry IO
437437 -> LevelNo
438438 -> MergePolicyForLevel
439439 -> NumRuns
@@ -444,17 +444,16 @@ newIncomingCompletedMergingRun ::
444444 (MonadMask m , MonadMVar m , MonadSTM m , MonadST m )
445445 => Tracer m (AtLevel MergeTrace )
446446 -> TableConfig
447- -> ActionRegistry m
448447 -> LevelNo
449448 -> MergePolicyForLevel
450449 -> NumRuns
451450 -> MergeDebt
452451 -> Ref (Run m h )
453452 -> m (IncomingRun m h )
454- newIncomingCompletedMergingRun tr conf reg ln mergePolicy nr mergeDebt r = do
453+ newIncomingCompletedMergingRun tr conf ln mergePolicy nr mergeDebt r = do
455454 traceWith tr $ AtLevel ln $
456455 TraceNewMergeCompletedRun (Run. size r) (Run. runFsPathsNumber r)
457- mr <- withRollback reg ( MR. newCompleted nr mergeDebt r) releaseRef
456+ mr <- MR. newCompleted nr mergeDebt r
458457 let nominalDebt = nominalDebtForLevel conf ln
459458 nominalCredits = nominalDebtAsCredits nominalDebt
460459 nominalCreditsVar <- newPrimVar nominalCredits
@@ -468,7 +467,6 @@ newIncomingCompletedMergingRun tr conf reg ln mergePolicy nr mergeDebt r = do
468467 -> UniqCounter IO
469468 -> TableConfig
470469 -> ResolveSerialisedValue
471- -> ActionRegistry IO
472470 -> MergePolicyForLevel
473471 -> MR.LevelMergeType
474472 -> LevelNo
@@ -483,7 +481,6 @@ newIncomingMergingRun ::
483481 -> UniqCounter m
484482 -> TableConfig
485483 -> ResolveSerialisedValue
486- -> ActionRegistry m
487484 -> MergePolicyForLevel
488485 -> MR. LevelMergeType
489486 -> LevelNo
@@ -494,8 +491,7 @@ newIncomingMergingRun tr hfs hbio activeDir uc
494491 confDiskCachePolicy,
495492 confFencePointerIndex
496493 }
497- resolve reg
498- mergePolicy mergeType ln rs = do
494+ resolve mergePolicy mergeType ln rs = do
499495 ! rn <- uniqueToRunNumber <$> incrUniqCounter uc
500496 let ! caching = diskCachePolicyForLevel confDiskCachePolicy ln
501497 ! alloc = bloomFilterAllocForLevel conf ln
@@ -504,11 +500,9 @@ newIncomingMergingRun tr hfs hbio activeDir uc
504500 traceWith tr $ AtLevel ln $
505501 TraceNewMerge (V. map Run. size rs) (runNumber runPaths)
506502 caching alloc mergePolicy mergeType
507- mr <- withRollback reg
508- (MR. new hfs hbio resolve caching
509- alloc indexType mergeType
510- runPaths rs)
511- releaseRef
503+ mr <- MR. new hfs hbio resolve caching
504+ alloc indexType mergeType
505+ runPaths rs
512506 let nominalDebt = nominalDebtForLevel conf ln
513507 nominalCredits = NominalCredits 0
514508 nominalCreditsVar <- newPrimVar nominalCredits
@@ -1061,33 +1055,30 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
10611055 TraceExpectCompletedMerge (Run. runFsPathsNumber r)
10621056 pure r
10631057
1064- -- Releases the runs.
1058+ -- Consumes and releases the runs.
10651059 newMerge :: MergePolicyForLevel
10661060 -> MR. LevelMergeType
10671061 -> LevelNo
10681062 -> V. Vector (Ref (Run m h ))
10691063 -> m (IncomingRun m h )
1070- newMerge mergePolicy mergeType ln rs
1071- | Just (r, rest) <- V. uncons rs
1072- , V. null rest = do
1073- -- We create a fresh reference and release the original one.
1074- -- This will also make it easier to trace back where it was allocated.
1075- r' <- withRollback reg (dupRef r) releaseRef
1076- ir <- newIncomingSingleRun tr ln r'
1077- delayedCommit reg (releaseRef r)
1078- pure ir
1079-
1080- | otherwise = assert (let l = V. length rs in l >= 2 && l <= 5 ) $ do
1081- ir <- newIncomingMergingRun tr hfs hbio (Paths. activeDir root) uc
1082- conf resolve reg
1083- mergePolicy mergeType ln rs
1084- -- The runs will end up inside the merging run, with fresh references.
1085- -- The original references can be released (but only on the happy path).
1086- V. forM_ rs $ \ r -> delayedCommit reg (releaseRef r)
1087- case confMergeSchedule of
1088- Incremental -> pure ()
1089- OneShot -> immediatelyCompleteIncomingRun tr conf ln ir
1090- return ir
1064+ newMerge mergePolicy mergeType ln rs = do
1065+ ir <- withRollback reg
1066+ (case V. uncons rs of
1067+ Just (r, rest) | V. null rest
1068+ -> newIncomingSingleRun tr ln r
1069+ _ -> newIncomingMergingRun tr hfs hbio
1070+ (Paths. activeDir root) uc
1071+ conf resolve mergePolicy mergeType
1072+ ln rs)
1073+ releaseIncomingRun
1074+ -- The runs will end up inside the incoming/merging run, with fresh
1075+ -- references (since newIncoming* will make duplicates).
1076+ -- The original references must be released (but only on the happy path).
1077+ V. forM_ rs $ \ r -> delayedCommit reg (releaseRef r)
1078+ case confMergeSchedule of
1079+ Incremental -> pure ()
1080+ OneShot -> immediatelyCompleteIncomingRun tr conf ln ir
1081+ return ir
10911082
10921083-- | We use levelling on the last level, unless that is also the first level.
10931084mergePolicyForLevel ::
0 commit comments