@@ -43,7 +43,7 @@ import Control.Monad.Class.MonadThrow (MonadMask, bracket,
4343 bracketOnError )
4444import Control.Monad.Primitive (PrimMonad )
4545import Control.RefCount
46- import Data.Foldable (sequenceA_ )
46+ import Data.Foldable (sequenceA_ , traverse_ )
4747import Data.Text (Text )
4848import qualified Data.Vector as V
4949import Database.LSMTree.Internal.Config
@@ -245,60 +245,91 @@ instance NFData r => NFData (SnapPreExistingRun r) where
245245-------------------------------------------------------------------------------}
246246
247247{-# SPECIALISE fromSnapMergingTree ::
248- ActionRegistry IO
249- -> HasFS IO h
248+ HasFS IO h
250249 -> HasBlockIO IO h
251- -> TableConfig
252250 -> UniqCounter IO
253251 -> ResolveSerialisedValue
254252 -> ActiveDir
253+ -> ActionRegistry IO
255254 -> SnapMergingTree (Ref (Run IO h))
256255 -> IO (Ref (MT.MergingTree IO h))
257256 #-}
258- -- | Duplicates runs and re-creates merging runs.
257+ -- | Converts a snapshot of a merging tree of runs to a real merging tree.
258+ --
259+ -- Returns a new reference. Input runs remain owned by the caller.
259260fromSnapMergingTree ::
260261 forall m h . (MonadMask m , MonadMVar m , MonadSTM m , MonadST m )
261- => ActionRegistry m
262- -> HasFS m h
262+ => HasFS m h
263263 -> HasBlockIO m h
264- -> TableConfig
265264 -> UniqCounter m
266265 -> ResolveSerialisedValue
267266 -> ActiveDir
267+ -> ActionRegistry m
268268 -> SnapMergingTree (Ref (Run m h ))
269269 -> m (Ref (MT. MergingTree m h ))
270- fromSnapMergingTree reg hfs hbio conf uc resolve dir ( SnapMergingTree snapTreeState) =
271- fromSnapTreeState snapTreeState
270+ fromSnapMergingTree hfs hbio uc resolve dir =
271+ go
272272 where
273- -- Partially applied functions for convenience
274- recurrence :: SnapMergingTree (Ref (Run m h )) -> m (Ref (MT. MergingTree m h ))
275- recurrence = fromSnapMergingTree reg hfs hbio conf uc resolve dir
276-
277- getSnapMergingRunState
278- :: forall t .
279- MR. IsMergeType t
280- => SnapMergingRunState t (Ref (Run m h ))
281- -> m (Ref (MR. MergingRun t m h ))
282- getSnapMergingRunState = fromSnapMergingRunState hfs hbio uc resolve dir
283-
284- -- Conversion definitions
285- fromSnapTreeState :: SnapMergingTreeState (Ref (Run m h )) -> m (Ref (MT. MergingTree m h ))
286- fromSnapTreeState (SnapCompletedTreeMerge run) =
287- MT. newPendingLevelMerge [MT. PreExistingRun run] Nothing
288- fromSnapTreeState (SnapPendingTreeMerge pMerge) = case pMerge of
289- SnapPendingLevelMerge peRuns maybeMergeTree -> do
290- peRuns' <- traverse fromSnapPreExistingRun peRuns
291- maybeMergeTree' <- traverse recurrence maybeMergeTree
292- MT. newPendingLevelMerge peRuns' maybeMergeTree'
293- SnapPendingUnionMerge mergeTrees ->
294- MT. newPendingUnionMerge =<< traverse recurrence mergeTrees
295- fromSnapTreeState (SnapOngoingTreeMerge smrs) =
296- MT. newOngoingMerge =<< getSnapMergingRunState smrs
297-
298- fromSnapPreExistingRun :: SnapPreExistingRun (Ref (Run m h )) -> m (MT. PreExistingRun m h )
299- fromSnapPreExistingRun (SnapPreExistingRun run) = pure $ MT. PreExistingRun run
300- fromSnapPreExistingRun (SnapPreExistingMergingRun smrs) =
301- MT. PreExistingMergingRun <$> getSnapMergingRunState smrs
273+ -- Reference strategy:
274+ -- * go returns a fresh reference
275+ -- * go ensures the returned reference will be cleaned up on failure,
276+ -- using withRollback
277+ -- * All results from recursive calls must be released locally on the
278+ -- happy path.
279+ go :: ActionRegistry m
280+ -> SnapMergingTree (Ref (Run m h ))
281+ -> m (Ref (MT. MergingTree m h ))
282+
283+ go reg (SnapMergingTree (SnapCompletedTreeMerge run)) =
284+ withRollback reg
285+ (MT. newCompletedMerge run)
286+ releaseRef
287+
288+ go reg (SnapMergingTree (SnapPendingTreeMerge
289+ (SnapPendingLevelMerge prs mmt))) = do
290+ prs' <- traverse (fromSnapPreExistingRun reg) prs
291+ mmt' <- traverse (go reg) mmt
292+ mt <- withRollback reg
293+ (MT. newPendingLevelMerge prs' mmt')
294+ releaseRef
295+ traverse_ (delayedCommit reg . releasePER) prs'
296+ traverse_ (delayedCommit reg . releaseRef) mmt'
297+ return mt
298+
299+ go reg (SnapMergingTree (SnapPendingTreeMerge
300+ (SnapPendingUnionMerge mts))) = do
301+ mts' <- traverse (go reg) mts
302+ mt <- withRollback reg
303+ (MT. newPendingUnionMerge mts')
304+ releaseRef
305+ traverse_ (delayedCommit reg . releaseRef) mts'
306+ return mt
307+
308+ go reg (SnapMergingTree (SnapOngoingTreeMerge smrs)) = do
309+ mr <- withRollback reg
310+ (fromSnapMergingRunState hfs hbio uc resolve dir smrs)
311+ releaseRef
312+ mt <- withRollback reg
313+ (MT. newOngoingMerge mr)
314+ releaseRef
315+ delayedCommit reg (releaseRef mr)
316+ return mt
317+
318+ -- Returns fresh refs, which must be released locally.
319+ fromSnapPreExistingRun :: ActionRegistry m
320+ -> SnapPreExistingRun (Ref (Run m h ))
321+ -> m (MT. PreExistingRun m h )
322+ fromSnapPreExistingRun reg (SnapPreExistingRun run) =
323+ MT. PreExistingRun <$>
324+ withRollback reg (dupRef run) releaseRef
325+ fromSnapPreExistingRun reg (SnapPreExistingMergingRun smrs) =
326+ MT. PreExistingMergingRun <$>
327+ withRollback reg
328+ (fromSnapMergingRunState hfs hbio uc resolve dir smrs)
329+ releaseRef
330+
331+ releasePER (MT. PreExistingRun r) = releaseRef r
332+ releasePER (MT. PreExistingMergingRun mr) = releaseRef mr
302333
303334{- ------------------------------------------------------------------------------
304335 Conversion to merge tree snapshot format
0 commit comments