Skip to content

Commit 006c430

Browse files
authored
Merge pull request #463 from IntersectMBO/mheinzel/refcounter-merging-run
Make MergingRun reference-counted
2 parents 2044d00 + 98e32a5 commit 006c430

File tree

7 files changed

+389
-360
lines changed

7 files changed

+389
-360
lines changed

prototypes/ScheduledMerges.hs

Lines changed: 53 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ type Counter = Int
6868
type Levels s = [Level s]
6969

7070
data Level s =
71-
-- | A level with a sequence of runs at this level, prefixed by
72-
-- a sequence of incoming run or runs that are being merged, with the
73-
-- result run to live at this level.
74-
Level !(MergingRun s) ![Run]
71+
-- | A level is a sequence of resident runs at this level, prefixed by an
72+
-- incoming run, which is usually multiple runs that are being merged,
73+
-- with the result run to live at this level.
74+
Level !(IncomingRun s) ![Run]
7575

7676
-- | The merge policy for a LSM level can be either tiering or levelling.
7777
-- In this design we use levelling for the last level, and tiering for
@@ -82,25 +82,26 @@ data Level s =
8282
data MergePolicy = MergePolicyTiering | MergePolicyLevelling
8383
deriving stock (Eq, Show)
8484

85-
-- | A last level merge behaves differenrly from a mid-level merge: last level
85+
-- | A last level merge behaves differently from a mid-level merge: last level
8686
-- merges can actually remove delete operations, whereas mid-level merges must
8787
-- preserve them. This is orthogonal to the 'MergePolicy'.
8888
--
8989
data MergeLastLevel = MergeMidLevel | MergeLastLevel
9090
deriving stock (Eq, Show)
9191

92-
-- | A \"merging run\" is the representation of an ongoing incremental merge,
93-
-- and in mutable. It is also a unit of sharing between duplicated LSM handles.
92+
-- | We represent single runs specially, rather than putting them in as a
93+
-- 'CompletedMerge'. This is for two reasons: to see statically that it's a
94+
-- single run without having to read the 'STRef', and secondly to make it easier
95+
-- to avoid supplying merge credits. It's not essential, but simplifies things
96+
-- somewhat.
97+
data IncomingRun s = Merging !(MergingRun s)
98+
| Single !Run
99+
100+
-- | A \"merging run\" is a mutable representation of an incremental merge,
101+
-- It is also a unit of sharing between duplicated LSM handles.
94102
--
95-
data MergingRun s = MergingRun !MergePolicy !MergeLastLevel
96-
!(STRef s MergingRunState)
97-
| SingleRun !Run
98-
-- ^ We represent single runs specially, rather than
99-
-- putting them in as a 'CompletedMerge'. This is for two
100-
-- reasons: to see statically that it's a single run
101-
-- without having to read the 'STRef', and secondly
102-
-- to make it easier to avoid supplying merge credits.
103-
-- It's not essential, but simplifies things somewhat.
103+
data MergingRun s =
104+
MergingRun !MergePolicy !MergeLastLevel !(STRef s MergingRunState)
104105

105106
data MergingRunState = CompletedMerge !Run
106107

@@ -175,13 +176,14 @@ invariant = go 1
175176
go !ln (Level mr rs : ls) = do
176177

177178
mrs <- case mr of
178-
SingleRun r -> return (CompletedMerge r)
179-
MergingRun _ _ ref -> readSTRef ref
179+
Single r -> return (CompletedMerge r)
180+
Merging (MergingRun _ _ ref) -> readSTRef ref
180181

181182
assertST $ case mr of
182-
SingleRun{} -> True
183-
MergingRun mp ml _ -> mergePolicyForLevel ln ls == mp
184-
&& mergeLastForLevel ls == ml
183+
Single{} -> True
184+
Merging (MergingRun mp ml _) ->
185+
mergePolicyForLevel ln ls == mp
186+
&& mergeLastForLevel ls == ml
185187
assertST $ length rs <= 3
186188
expectedRunLengths ln rs ls
187189
expectedMergingRunLengths ln mr mrs ls
@@ -193,11 +195,11 @@ invariant = go 1
193195
expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s ()
194196
expectedRunLengths ln rs ls =
195197
case mergePolicyForLevel ln ls of
196-
-- Levels using levelling have only one run, and that single run is
197-
-- (almost) always involved in an ongoing merge. Thus there are no
198-
-- other "normal" runs. The exception is when a levelling run becomes
199-
-- too large and is promoted, in that case initially there's no merge,
200-
-- but it is still represented as a 'MergingRun', using 'SingleRun'.
198+
-- Levels using levelling have only one (incoming) run, which almost
199+
-- always consists of an ongoing merge. The exception is when a
200+
-- levelling run becomes too large and is promoted, in that case
201+
-- initially there's no merge, but it is still represented as an
202+
-- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
201203
MergePolicyLevelling -> assertST $ null rs
202204
-- Runs in tiering levels usually fit that size, but they can be one
203205
-- larger, if a run has been held back (creating a 5-way merge).
@@ -209,7 +211,7 @@ invariant = go 1
209211

210212
-- Incoming runs being merged also need to be of the right size, but the
211213
-- conditions are more complicated.
212-
expectedMergingRunLengths :: Int -> MergingRun s -> MergingRunState
214+
expectedMergingRunLengths :: Int -> IncomingRun s -> MergingRunState
213215
-> [Level s] -> ST s ()
214216
expectedMergingRunLengths ln mr mrs ls =
215217
case mergePolicyForLevel ln ls of
@@ -218,7 +220,7 @@ invariant = go 1
218220
case (mr, mrs) of
219221
-- A single incoming run (which thus didn't need merging) must be
220222
-- of the expected size range already
221-
(SingleRun r, m) -> do
223+
(Single r, m) -> do
222224
assertST $ case m of CompletedMerge{} -> True
223225
OngoingMerge{} -> False
224226
assertST $ levellingRunSizeToLevel r == ln
@@ -246,7 +248,7 @@ invariant = go 1
246248
case (mr, mrs, mergeLastForLevel ls) of
247249
-- A single incoming run (which thus didn't need merging) must be
248250
-- of the expected size already
249-
(SingleRun r, m, _) -> do
251+
(Single r, m, _) -> do
250252
assertST $ case m of CompletedMerge{} -> True
251253
OngoingMerge{} -> False
252254
assertST $ tieringRunSizeToLevel r == ln
@@ -286,8 +288,8 @@ assertST p = assert p $ return (const () callStack)
286288

287289
newMerge :: Tracer (ST s) EventDetail
288290
-> Int -> MergePolicy -> MergeLastLevel
289-
-> [Run] -> ST s (MergingRun s)
290-
newMerge _ _ _ _ [r] = return (SingleRun r)
291+
-> [Run] -> ST s (IncomingRun s)
292+
newMerge _ _ _ _ [r] = return (Single r)
291293
newMerge tr level mergepolicy mergelast rs = do
292294
traceWith tr MergeStartedEvent {
293295
mergePolicy = mergepolicy,
@@ -298,7 +300,8 @@ newMerge tr level mergepolicy mergelast rs = do
298300
}
299301
assert (length rs `elem` [4, 5]) $
300302
assert (mergeDebtLeft debt >= cost) $
301-
MergingRun mergepolicy mergelast <$> newSTRef (OngoingMerge debt rs r)
303+
fmap (Merging . MergingRun mergepolicy mergelast) $
304+
newSTRef (OngoingMerge debt rs r)
302305
where
303306
cost = sum (map runSize rs)
304307
-- How much we need to discharge before the merge can be guaranteed
@@ -328,9 +331,9 @@ lastLevelMerge = Map.filter isInsert
328331

329332
expectCompletedMerge :: HasCallStack
330333
=> Tracer (ST s) EventDetail
331-
-> MergingRun s -> ST s Run
332-
expectCompletedMerge _ (SingleRun r) = return r
333-
expectCompletedMerge tr (MergingRun mergepolicy mergelast ref) = do
334+
-> IncomingRun s -> ST s Run
335+
expectCompletedMerge _ (Single r) = return r
336+
expectCompletedMerge tr (Merging (MergingRun mergepolicy mergelast ref)) = do
334337
mrs <- readSTRef ref
335338
case mrs of
336339
CompletedMerge r -> do
@@ -344,9 +347,9 @@ expectCompletedMerge tr (MergingRun mergepolicy mergelast ref) = do
344347
error $ "expectCompletedMerge: false expectation, remaining debt of "
345348
++ show d
346349

347-
supplyMergeCredits :: Credit -> MergingRun s -> ST s ()
348-
supplyMergeCredits _ SingleRun{} = return ()
349-
supplyMergeCredits c (MergingRun _ _ ref) = do
350+
supplyMergeCredits :: Credit -> IncomingRun s -> ST s ()
351+
supplyMergeCredits _ Single{} = return ()
352+
supplyMergeCredits c (Merging (MergingRun _ _ ref)) = do
350353
mrs <- readSTRef ref
351354
case mrs of
352355
CompletedMerge{} -> return ()
@@ -485,17 +488,19 @@ supplyCredits n =
485488
-- | The general case (and thus worst case) of how many merge credits we need
486489
-- for a level. This is based on the merging policy at the level.
487490
--
488-
creditsForMerge :: MergingRun s -> ST s Rational
489-
creditsForMerge SingleRun{} = return 0
491+
creditsForMerge :: IncomingRun s -> ST s Rational
492+
creditsForMerge Single{} =
493+
return 0
490494

491495
-- A levelling merge has 1 input run and one resident run, which is (up to) 4x
492496
-- bigger than the others.
493497
-- It needs to be completed before another run comes in.
494-
creditsForMerge (MergingRun MergePolicyLevelling _ _) = return $ (1 + 4) / 1
498+
creditsForMerge (Merging (MergingRun MergePolicyLevelling _ _)) =
499+
return $ (1 + 4) / 1
495500

496501
-- A tiering merge has 5 runs at most (once could be held back to merged again)
497502
-- and must be completed before the level is full (once 4 more runs come in).
498-
creditsForMerge (MergingRun MergePolicyTiering _ ref) = do
503+
creditsForMerge (Merging (MergingRun MergePolicyTiering _ ref)) = do
499504
readSTRef ref >>= \case
500505
CompletedMerge _ -> return 0
501506
OngoingMerge _ rs _ -> do
@@ -625,11 +630,11 @@ flattenLevels :: Levels s -> ST s [[Run]]
625630
flattenLevels = mapM flattenLevel
626631

627632
flattenLevel :: Level s -> ST s [Run]
628-
flattenLevel (Level mr rs) = (++rs) <$> flattenMergingRun mr
633+
flattenLevel (Level mr rs) = (++rs) <$> flattenIncomingRun mr
629634

630-
flattenMergingRun :: MergingRun s -> ST s [Run]
631-
flattenMergingRun (SingleRun r) = return [r]
632-
flattenMergingRun (MergingRun _ _ mr) = do
635+
flattenIncomingRun :: IncomingRun s -> ST s [Run]
636+
flattenIncomingRun (Single r) = return [r]
637+
flattenIncomingRun (Merging (MergingRun _ _ mr)) = do
633638
mrs <- readSTRef mr
634639
case mrs of
635640
CompletedMerge r -> return [r]
@@ -649,9 +654,9 @@ dumpRepresentation (LSMHandle _ lsmr) = do
649654
((Nothing, [wb]) :) <$> mapM dumpLevel ls
650655

651656
dumpLevel :: Level s -> ST s (Maybe (MergePolicy, MergeLastLevel, MergingRunState), [Run])
652-
dumpLevel (Level (SingleRun r) rs) =
657+
dumpLevel (Level (Single r) rs) =
653658
return (Nothing, (r:rs))
654-
dumpLevel (Level (MergingRun mp ml mr) rs) = do
659+
dumpLevel (Level (Merging (MergingRun mp ml mr)) rs) = do
655660
mrs <- readSTRef mr
656661
return (Just (mp, ml, mrs), rs)
657662

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
270270
, NoThunks (StrictMVar m (MergingRunState m h))
271271
) => NoThunks (Level m h)
272272

273+
deriving stock instance Generic (IncomingRun m h)
274+
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
275+
, NoThunks (StrictMVar m (MergingRunState m h))
276+
) => NoThunks (IncomingRun m h)
277+
273278
deriving stock instance Generic (MergingRun m h)
274279
deriving anyclass instance ( Typeable m, Typeable (PrimState m), Typeable h
275280
, NoThunks (StrictMVar m (MergingRunState m h))

src/Database/LSMTree/Internal/Merge.hs

Lines changed: 30 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@ module Database.LSMTree.Internal.Merge (
77
, Mappend
88
, MergeState (..)
99
, new
10-
, addReference
11-
, removeReference
12-
, removeReferenceN
13-
, readRefCount
10+
, abort
1411
, complete
1512
, stepsToCompletion
1613
, stepsToCompletionCounted
@@ -22,17 +19,15 @@ import Control.Exception (assert)
2219
import Control.Monad (when)
2320
import Control.Monad.Class.MonadST (MonadST)
2421
import Control.Monad.Class.MonadSTM (MonadSTM (..))
25-
import Control.Monad.Class.MonadThrow (MonadCatch, MonadMask (..),
22+
import Control.Monad.Class.MonadThrow (MonadCatch, MonadMask,
2623
MonadThrow (..))
2724
import Control.Monad.Fix (MonadFix)
28-
import Control.Monad.Primitive (PrimMonad, PrimState, RealWorld)
29-
import Control.RefCount (RefCount (..), RefCounter)
30-
import qualified Control.RefCount as RC
25+
import Control.Monad.Primitive (PrimState)
26+
import Control.RefCount (RefCount (..))
3127
import Data.Coerce (coerce)
3228
import Data.Primitive.MutVar
3329
import Data.Traversable (for)
3430
import qualified Data.Vector as V
35-
import Data.Word
3631
import Database.LSMTree.Internal.BlobRef (RawBlobRef)
3732
import Database.LSMTree.Internal.Entry
3833
import Database.LSMTree.Internal.Run (Run, RunDataCaching)
@@ -44,7 +39,7 @@ import qualified Database.LSMTree.Internal.RunReader as Reader
4439
import Database.LSMTree.Internal.RunReaders (Readers)
4540
import qualified Database.LSMTree.Internal.RunReaders as Readers
4641
import Database.LSMTree.Internal.Serialise
47-
import GHC.Stack (HasCallStack)
42+
import qualified System.FS.API as FS
4843
import System.FS.API (HasFS)
4944
import System.FS.BlockIO.API (HasBlockIO)
5045

@@ -62,7 +57,6 @@ data Merge m h = Merge {
6257
-- | The result of the latest call to 'steps'. This is used to determine
6358
-- whether a merge can be 'complete'd.
6459
, mergeState :: !(MutVar (PrimState m) MergeState)
65-
, mergeRefCounter :: !(RefCounter m)
6660
, mergeHasFS :: !(HasFS m h)
6761
, mergeHasBlockIO :: !(HasBlockIO m h)
6862
}
@@ -98,7 +92,7 @@ type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue
9892
-- | Returns 'Nothing' if no input 'Run' contains any entries.
9993
-- The list of runs should be sorted from new to old.
10094
new ::
101-
(MonadCatch m, MonadSTM m, MonadST m, MonadFix m)
95+
(MonadCatch m, MonadSTM m, MonadST m)
10296
=> HasFS m h
10397
-> HasBlockIO m h
10498
-> RunDataCaching
@@ -116,67 +110,42 @@ new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
116110
let numEntries = coerce (sum @V.Vector @Int) (fmap Run.runNumEntries runs)
117111
mergeBuilder <- Builder.new fs hbio targetPaths numEntries alloc
118112
mergeState <- newMutVar $! Merging
119-
mergeRefCounter <-
120-
RC.mkRefCounter1 (Just $! finaliser mergeState mergeBuilder mergeReaders)
121113
return Merge {
122114
mergeHasFS = fs
123115
, mergeHasBlockIO = hbio
124116
, ..
125117
}
126118

127-
{-# SPECIALISE addReference :: Merge IO h -> IO () #-}
128-
addReference :: (HasCallStack, PrimMonad m) => Merge m h -> m ()
129-
addReference Merge{..} = RC.addReference mergeRefCounter
130-
131-
{-# SPECIALISE removeReference :: Merge IO h -> IO () #-}
132-
removeReference :: (HasCallStack, PrimMonad m, MonadMask m) => Merge m h -> m ()
133-
removeReference Merge{..} = RC.removeReference mergeRefCounter
134-
135-
{-# SPECIALISE removeReferenceN :: Merge IO h -> Word64 -> IO () #-}
136-
removeReferenceN :: (HasCallStack, PrimMonad m, MonadMask m) => Merge m h -> Word64 -> m ()
137-
removeReferenceN r = RC.removeReferenceN (mergeRefCounter r)
138-
139-
{-# SPECIALISE readRefCount :: Merge IO h -> IO RefCount #-}
140-
readRefCount :: PrimMonad m => Merge m h -> m RefCount
141-
readRefCount Merge{..} = RC.readRefCount mergeRefCounter
142-
143-
{-# SPECIALISE finaliser ::
144-
MutVar RealWorld MergeState
145-
-> RunBuilder IO h
146-
-> Readers IO h
147-
-> IO () #-}
148-
-- | Closes the underlying builder and readers.
119+
{-# SPECIALISE abort :: Merge IO (FS.Handle h) -> IO () #-}
120+
-- | This function should be called when discarding a 'Merge' before it
121+
-- was done (i.e. returned 'MergeComplete'). This removes the incomplete files
122+
-- created for the new run so far and avoids leaking file handles.
149123
--
150-
-- This function is idempotent. Technically, this is not necessary because the
151-
-- finaliser is going to run only once, but it is a nice property for
152-
-- @close@-like functions to be idempotent.
153-
finaliser ::
154-
(MonadFix m, MonadSTM m, MonadST m)
155-
=> MutVar (PrimState m) MergeState
156-
-> RunBuilder m h
157-
-> Readers m h
158-
-> m ()
159-
finaliser var b rs = do
160-
st <- readMutVar var
161-
let shouldClose = case st of
162-
Merging -> True
163-
MergingDone -> True
164-
Completed -> False
165-
Closed -> False
166-
when shouldClose $ do
167-
Builder.close b
168-
Readers.close rs
169-
writeMutVar var $! Closed
124+
-- Once it has been called, do not use the 'Merge' any more!
125+
abort :: (MonadSTM m, MonadST m) => Merge m h -> m ()
126+
abort Merge {..} = do
127+
readMutVar mergeState >>= \case
128+
Merging -> do
129+
Readers.close mergeReaders
130+
Builder.close mergeBuilder
131+
MergingDone -> do
132+
-- the readers are already drained, therefore closed
133+
Builder.close mergeBuilder
134+
Completed ->
135+
assert False $ pure ()
136+
Closed ->
137+
assert False $ pure ()
138+
writeMutVar mergeState $! Closed
170139

171140
{-# SPECIALISE complete ::
172141
Merge IO h
173142
-> IO (Run IO h) #-}
174143
-- | Complete a 'Merge', returning a new 'Run' as the result of merging the
175144
-- input runs.
176145
--
177-
-- The resulting run has the same reference count as the input 'Merge'. The
178-
-- 'Merge' does not have to be closed afterwards, since it is closed implicitly
179-
-- by 'complete'.
146+
-- All resources held by the merge are released, so do not use the it any more!
147+
--
148+
-- The resulting run has a reference count of 1.
180149
--
181150
-- This function will /not/ do any merging work if there is any remaining. That
182151
-- is, if not enough 'steps' were performed to exhaust the input 'Readers', this
@@ -196,14 +165,8 @@ complete Merge{..} = do
196165
readMutVar mergeState >>= \case
197166
Merging -> error "complete: Merge is not done"
198167
MergingDone -> do
199-
-- Since access to a merge /should/ be sequentialised, we can assume
200-
-- that the ref count has not changed between this read and the use of
201-
-- fromMutable.
202-
--
203-
-- TODO: alternatively, the mergeRefCounter could be reused as the
204-
-- reference counter for the output run.
205-
n <- RC.readRefCount mergeRefCounter
206-
r <- Run.fromMutable mergeCaching n mergeBuilder
168+
-- the readers are already drained, therefore closed
169+
r <- Run.fromMutable mergeCaching (RefCount 1) mergeBuilder
207170
writeMutVar mergeState $! Completed
208171
pure r
209172
Completed -> error "complete: Merge is already completed"

0 commit comments

Comments
 (0)