Skip to content

Commit 42c6f7d

Browse files
committed
Implement scheduled merges
The following features will be implemented later: rebuilding caches on lookups, and doing merging work in batches.
1 parent 69e5cf8 commit 42c6f7d

File tree

9 files changed

+283
-55
lines changed

9 files changed

+283
-55
lines changed

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,12 @@ deriving stock instance Generic (MergingRunState m h)
277277
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
278278
=> NoThunks (MergingRunState m h)
279279

280+
deriving stock instance Generic MergePolicyForLevel
281+
deriving anyclass instance NoThunks MergePolicyForLevel
282+
283+
deriving stock instance Generic NumRuns
284+
deriving anyclass instance NoThunks NumRuns
285+
280286
{-------------------------------------------------------------------------------
281287
Entry
282288
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal.hs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
9696
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
9797
import Database.LSMTree.Internal.Config
9898
import qualified Database.LSMTree.Internal.Cursor as Cursor
99-
import Database.LSMTree.Internal.Entry (Entry)
99+
import Database.LSMTree.Internal.Entry (Entry, unNumEntries)
100100
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
101101
ResolveSerialisedValue, lookupsIO)
102102
import Database.LSMTree.Internal.MergeSchedule
@@ -1108,6 +1108,12 @@ snapshot resolve snap label th = do
11081108
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
11091109
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
11101110
$ \reg content -> do
1111+
-- TODO: When we flush the buffer here, it might be underfull, which
1112+
-- could mess up the scheduling. The conservative approach is to supply
1113+
-- credits as if the buffer was full, and then flush the (possibly)
1114+
-- underfull buffer. However, note that this bit of code
1115+
-- here is probably going to change anyway because of #392
1116+
supplyCredits (unNumEntries $ case confWriteBufferAlloc conf of AllocNumEntries x -> x) (tableLevels content)
11111117
content' <- flushWriteBuffer
11121118
(TraceMerge `contramap` tableTracer th)
11131119
conf
@@ -1140,6 +1146,7 @@ snapshot resolve snap label th = do
11401146
-> SnapshotLabel
11411147
-> TableConfigOverride
11421148
-> SnapshotName
1149+
-> ResolveSerialisedValue
11431150
-> IO (TableHandle IO h) #-}
11441151
-- | See 'Database.LSMTree.Normal.open'.
11451152
open ::
@@ -1148,8 +1155,9 @@ open ::
11481155
-> SnapshotLabel -- ^ Expected label
11491156
-> TableConfigOverride -- ^ Optional config override
11501157
-> SnapshotName
1158+
-> ResolveSerialisedValue
11511159
-> m (TableHandle m h)
1152-
open sesh label override snap = do
1160+
open sesh label override snap resolve = do
11531161
traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override
11541162
withOpenSession sesh $ \seshEnv -> do
11551163
withTempRegistry $ \reg -> do
@@ -1173,7 +1181,7 @@ open sesh label override snap = do
11731181
<- allocateTemp reg
11741182
(WBB.new hfs blobpath)
11751183
WBB.removeReference
1176-
tableLevels <- openLevels reg hfs hbio conf (sessionRoot seshEnv) snappedLevels
1184+
tableLevels <- openLevels reg hfs hbio conf (sessionUniqCounter seshEnv) (sessionRoot seshEnv) resolve snappedLevels
11771185
tableCache <- mkLevelsCache reg tableLevels
11781186
newWith reg sesh seshEnv conf' am $! TableContent {
11791187
tableWriteBuffer = WB.empty

src/Database/LSMTree/Internal/Merge.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ stepsToCompletionCounted m stepBatchSize = go 0
250250
in (stepsSum',) <$> complete m
251251

252252
data StepResult = MergeInProgress | MergeComplete
253+
deriving stock Eq
253254

254255
stepsInvariant :: Int -> (Int, StepResult) -> Bool
255256
stepsInvariant requestedSteps = \case

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 183 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
{-# LANGUAGE CPP #-}
22
{-# LANGUAGE DataKinds #-}
33

4+
-- TODO: establish that this implementation matches up with the ScheduledMerges
5+
-- prototype. See lsm-tree#445.
46
module Database.LSMTree.Internal.MergeSchedule (
57
-- * Traces
68
AtLevel (..)
@@ -16,32 +18,42 @@ module Database.LSMTree.Internal.MergeSchedule (
1618
, Levels
1719
, Level (..)
1820
, MergingRun (..)
21+
, NumRuns (..)
1922
, MergingRunState (..)
2023
-- * Flushes and scheduled merges
2124
, updatesWithInterleavedFlushes
2225
, flushWriteBuffer
2326
-- * Exported for cabal-docspec
2427
, MergePolicyForLevel (..)
2528
, maxRunSize
29+
-- * Credits
30+
, supplyCredits
31+
, ScaledCredits (..)
32+
, supplyMergeCredits
2633
) where
2734

2835
import Control.Concurrent.Class.MonadMVar.Strict
36+
import Control.Monad (when)
2937
import Control.Monad.Class.MonadST (MonadST)
3038
import Control.Monad.Class.MonadSTM (MonadSTM (..))
3139
import Control.Monad.Class.MonadThrow (MonadCatch, MonadMask,
3240
MonadThrow (..))
3341
import Control.Monad.Fix (MonadFix)
3442
import Control.Monad.Primitive
43+
import Control.RefCount (RefCount (..))
3544
import Control.TempRegistry
3645
import Control.Tracer
3746
import Data.BloomFilter (Bloom)
47+
import Data.Primitive.PrimVar
3848
import qualified Data.Vector as V
39-
import Database.LSMTree.Internal.Assertions (assert)
49+
import Database.LSMTree.Internal.Assertions (assert,
50+
fromIntegralChecked)
4051
import Database.LSMTree.Internal.Config
41-
import Database.LSMTree.Internal.Entry (Entry, NumEntries (..))
52+
import Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
53+
unNumEntries)
4254
import Database.LSMTree.Internal.IndexCompact (IndexCompact)
4355
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
44-
import Database.LSMTree.Internal.Merge (Merge)
56+
import Database.LSMTree.Internal.Merge (Merge, StepResult (..))
4557
import qualified Database.LSMTree.Internal.Merge as Merge
4658
import Database.LSMTree.Internal.Paths (RunFsPaths (..),
4759
SessionRoot (..))
@@ -200,6 +212,12 @@ mkLevelsCache reg lvls = do
200212
--
201213
-- * Keep the cache feature, but force a rebuild every once in a while, e.g.,
202214
-- once in every 100 lookups.
215+
--
216+
-- TODO: rebuilding the cache can invalidate blob references if the cache was
217+
-- holding the last reference to a run. This is not really a problem of just the
218+
-- caching approach, but allowing merges to finish early. We should come up with
219+
-- a solution to keep blob references valid until the next /update/ comes along.
220+
-- Lookups should no invalidate blob erferences.
203221
rebuildCache ::
204222
(PrimMonad m, MonadMVar m, MonadMask m)
205223
=> TempRegistry m
@@ -251,12 +269,26 @@ data Level m h = Level {
251269

252270
-- | A merging run is either a single run, or some ongoing merge.
253271
data MergingRun m h =
254-
MergingRun !(StrictMVar m (MergingRunState m h))
272+
-- | A merging of multiple runs.
273+
MergingRun !MergePolicyForLevel !NumRuns !(StrictMVar m (MergingRunState m h))
274+
-- | The result of merging a single run, is a single run.
255275
| SingleRun !(Run m h)
256276

277+
newtype NumRuns = NumRuns { unNumRuns :: Int }
278+
deriving stock (Show, Eq)
279+
257280
data MergingRunState m h =
258-
CompletedMerge !(Run m h)
259-
| OngoingMerge !(V.Vector (Run m h)) !(Merge m h)
281+
CompletedMerge
282+
!(Run m h)
283+
-- ^ Output run
284+
| OngoingMerge
285+
!(V.Vector (Run m h))
286+
-- ^ Input runs
287+
!(PrimVar (PrimState m) Int)
288+
-- ^ The total number of performed merging steps.
289+
!(PrimVar (PrimState m) Int)
290+
-- ^ The total number of supplied credits.
291+
!(Merge m h)
260292

261293
{-# SPECIALISE addReferenceLevels :: TempRegistry IO -> Levels IO h -> IO () #-}
262294
addReferenceLevels ::
@@ -294,9 +326,9 @@ forRunAndMergeM_ ::
294326
forRunAndMergeM_ lvls k1 k2 = V.forM_ lvls $ \(Level mr rs) -> do
295327
case mr of
296328
SingleRun r -> k1 r
297-
MergingRun var -> withMVar var $ \case
329+
MergingRun _ _ var -> withMVar var $ \case
298330
CompletedMerge r -> k1 r
299-
OngoingMerge irs m -> V.mapM_ k1 irs >> k2 m
331+
OngoingMerge irs _ _ m -> V.mapM_ k1 irs >> k2 m
300332
V.mapM_ k1 rs
301333

302334
{-# SPECIALISE foldRunM ::
@@ -313,9 +345,9 @@ foldRunM ::
313345
foldRunM f x lvls = flip (flip V.foldM x) lvls $ \y (Level mr rs) -> do
314346
z <- case mr of
315347
SingleRun r -> f y r
316-
MergingRun var -> withMVar var $ \case
348+
MergingRun _ _ var -> withMVar var $ \case
317349
CompletedMerge r -> f y r
318-
OngoingMerge irs _m -> V.foldM f y irs
350+
OngoingMerge irs _ _ _m -> V.foldM f y irs
319351
V.foldM f z rs
320352

321353
{-# SPECIALISE forRunM ::
@@ -391,6 +423,11 @@ updatesWithInterleavedFlushes tr conf resolve hfs hbio root uc es reg tc = do
391423
let wb = tableWriteBuffer tc
392424
wbblobs = tableWriteBufferBlobs tc
393425
(wb', es') <- addWriteBufferEntries hfs resolve wbblobs maxn wb es
426+
-- Supply credits before flushing, so that we complete merges in time. The
427+
-- number of supplied credits is based on the size increase of the write
428+
-- buffer, not the the number of processed entries @length es' - length es@.
429+
let numAdded = unNumEntries (WB.numEntries wb') - unNumEntries (WB.numEntries wb)
430+
supplyCredits numAdded (tableLevels tc)
394431
let tc' = tc { tableWriteBuffer = wb' }
395432
if WB.numEntries wb' < maxn then do
396433
pure $! tc'
@@ -635,9 +672,6 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
635672
mr <- newMerge policyForLevel Merge.LastLevel ln rs
636673
return $ V.singleton $ Level mr V.empty
637674
go !ln rs' (V.uncons -> Just (Level mr rs, ls)) = do
638-
-- TODO: until we have proper scheduling, the merging run is actually
639-
-- always stepped to completion immediately, so we can see it is just a
640-
-- single run.
641675
r <- expectCompletedMerge ln mr
642676
case mergePolicyForLevel confMergePolicy ln ls of
643677
-- If r is still too small for this level then keep it and merge again
@@ -683,15 +717,16 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
683717
expectCompletedMerge ln (SingleRun r) = do
684718
traceWith tr $ AtLevel ln $ TraceExpectCompletedMergeSingleRun (runNumber $ Run.runRunFsPaths r)
685719
pure r
686-
expectCompletedMerge ln (MergingRun var) = do
687-
withMVar var $ \case
688-
CompletedMerge r -> do
689-
traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (runNumber $ Run.runRunFsPaths r)
690-
pure r
691-
OngoingMerge _rs _m -> error "expectCompletedMerge: OngoingMerge not yet supported" -- TODO: implement.
692-
693-
-- TODO: Until we implement proper scheduling, this does not only start a
694-
-- merge, but it also steps it to completion.
720+
expectCompletedMerge ln (MergingRun _ _ var) = do
721+
r <- withMVar var $ \case
722+
CompletedMerge r -> pure r
723+
OngoingMerge{} -> do
724+
-- If the algorithm finds an ongoing merge here, then it is a bug in
725+
-- our merge sceduling algorithm. As such, we throw a pure error.
726+
error "expectCompletedMerge: expected a completed merge, but found an ongoing merge"
727+
traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (runNumber $ Run.runRunFsPaths r)
728+
pure r
729+
695730
newMerge :: MergePolicyForLevel
696731
-> Merge.Level
697732
-> LevelNo
@@ -716,12 +751,22 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
716751
Run.removeReference
717752
traceWith tr $ AtLevel ln $ TraceCompletedMerge (Run.runNumEntries r) (runNumber $ Run.runRunFsPaths r)
718753
V.mapM_ (freeTemp reg . Run.removeReference) rs
719-
var <- newMVar (CompletedMerge r)
720-
pure $! MergingRun var
721-
Incremental -> error "newMerge: Incremental is not yet supported" -- TODO: implement
754+
var <- newMVar $! CompletedMerge r
755+
pure $! MergingRun mergepolicy (NumRuns $ V.length rs) var
756+
Incremental -> do
757+
mergeMaybe <- allocateMaybeTemp reg
758+
(Merge.new hfs hbio caching alloc mergelast resolve runPaths rs)
759+
Merge.removeReference
760+
case mergeMaybe of
761+
Nothing -> error "newMerge: merges can not be empty"
762+
Just m -> do
763+
totalStepsVar <- newPrimVar $! 0
764+
totalCreditsVar <- newPrimVar $! 0
765+
var <- newMVar $! OngoingMerge rs totalStepsVar totalCreditsVar m
766+
pure $! MergingRun mergepolicy (NumRuns $ V.length rs) var
722767

723768
data MergePolicyForLevel = LevelTiering | LevelLevelling
724-
deriving stock Show
769+
deriving stock (Show, Eq)
725770

726771
mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h -> MergePolicyForLevel
727772
mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels
@@ -798,3 +843,115 @@ mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do
798843
Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs >>= \case
799844
Nothing -> error "mergeRuns: no inputs"
800845
Just m -> Merge.stepsToCompletion m 1024
846+
847+
{-------------------------------------------------------------------------------
848+
Credits
849+
-------------------------------------------------------------------------------}
850+
851+
type Credit = Int
852+
853+
{-# SPECIALISE supplyCredits ::
854+
Credit
855+
-> Levels IO h
856+
-> IO ()
857+
#-}
858+
supplyCredits ::
859+
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m, MonadFix m)
860+
=> Credit
861+
-> Levels m h
862+
-> m ()
863+
supplyCredits c levels =
864+
V.iforM_ levels $ \_i (Level mr _rs) ->
865+
-- let !ln = i + 1 in
866+
let !c' = scaleCreditsForMerge mr c in
867+
supplyMergeCredits c' mr
868+
869+
-- | 'Credit's scaled based on the merge requirements for merging runs. See
870+
-- 'scaleCreditsForMerge'.
871+
newtype ScaledCredits = ScaledCredits Int
872+
873+
-- | Scale a number of credits to a number of merge steps to be performed, based
874+
-- on the merging run.
875+
--
876+
-- Initially, 1 update supplies 1 credit. However, since merging runs have
877+
-- different numbers of input runs/entries, we may have to a more or less
878+
-- merging work than 1 merge step for each credit.
879+
scaleCreditsForMerge :: MergingRun m h -> Credit -> ScaledCredits
880+
-- A single run is a trivially completed merge, so it requires no credits.
881+
scaleCreditsForMerge SingleRun{} _ = ScaledCredits 0
882+
-- A levelling merge has 1 input run and one resident run, which is (up to) 4x
883+
-- bigger than the others. It needs to be completed before another run comes in.
884+
--
885+
-- TODO: this is currently assuming a naive worst case, where the resident run
886+
-- is as large as it can be for the current level. We probably have enough
887+
-- information available here to lower the worst-case upper bound by looking at
888+
-- the sizes of the input runs. As as result, merge work would/could be more
889+
-- evenly distributed over time when the resident run is smaller than the worst
890+
-- case.
891+
scaleCreditsForMerge (MergingRun LevelLevelling _ _) c =
892+
ScaledCredits (c * (1 + 4))
893+
-- A tiering merge has 5 runs at most (one could be held back to merged again)
894+
-- and must be completed before the level is full (once 4 more runs come in).
895+
scaleCreditsForMerge (MergingRun LevelTiering (NumRuns n) _) c =
896+
ScaledCredits ((c * n + 3) `div` 4)
897+
-- same as division rounding up: ceiling (c * n / 4)
898+
899+
{-# SPECIALISE supplyMergeCredits :: ScaledCredits -> MergingRun IO h -> IO () #-}
900+
-- TODO: implement doing merge werk in batches, instead of always taking the
901+
-- MVar. The thresholds for doing merge work should be different for each level,
902+
-- maybe co-prime?
903+
supplyMergeCredits ::
904+
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m, MonadFix m)
905+
=> ScaledCredits
906+
-> MergingRun m h
907+
-> m ()
908+
supplyMergeCredits _ SingleRun{} = pure ()
909+
supplyMergeCredits (ScaledCredits c) (MergingRun _ _ var) = do
910+
mergeIsDone <- withMVar var $ \case
911+
CompletedMerge{} -> pure False
912+
(OngoingMerge _rs totalStepsVar totalCreditsVar m) -> do
913+
totalSteps <- readPrimVar totalStepsVar
914+
totalCredits <- readPrimVar totalCreditsVar
915+
916+
-- If we previously performed too many merge steps, then we perform
917+
-- fewer now.
918+
let stepsToDo = max 0 (totalCredits + c - totalSteps)
919+
-- Merge.steps guarantees that stepsDone >= stepsToDo /unless/ the merge
920+
-- was just now finished.
921+
(stepsDone, stepResult) <- Merge.steps m stepsToDo
922+
assert (case stepResult of
923+
MergeInProgress -> stepsDone >= stepsToDo
924+
MergeComplete -> True
925+
) $ pure ()
926+
927+
-- This should be the only point at which we write to these variables.
928+
--
929+
-- It is guaranteed that totalSteps' >= totaltCredits' /unless/ the
930+
-- merge was just now finished.
931+
let totalSteps' = totalSteps + stepsDone
932+
let totalCredits' = totalCredits + c
933+
-- If an exception happens between the next two writes, then only
934+
-- totalCreditsVar will be outdated, which is okay because we will
935+
-- resupply credits. It also means we can maintain that @readPrimVar
936+
-- totalStepsVar >= readPrimVar totalCreditsVar@, /unless/ the merge was
937+
-- just now finished.
938+
writePrimVar totalStepsVar $! totalSteps + stepsDone
939+
writePrimVar totalCreditsVar $! totalCredits + c
940+
assert (case stepResult of
941+
MergeInProgress -> totalSteps' >= totalCredits'
942+
MergeComplete -> True
943+
) $ pure ()
944+
945+
pure $ stepResult == MergeComplete
946+
when mergeIsDone $
947+
modifyMVarMasked_ var $ \case
948+
mr@CompletedMerge{} -> pure $! mr
949+
(OngoingMerge rs _totalStepsVar _totalCreditsVar m) -> do
950+
-- TODO: we'll likely move away from this style of reference counting,
951+
-- so this code will change in the future.
952+
RefCount n <- Merge.readRefCount m
953+
let !n' = fromIntegralChecked n
954+
V.forM_ rs $ \r -> Run.removeReferenceN r n'
955+
r <- Merge.complete m
956+
Merge.removeReferenceN m n'
957+
pure $! CompletedMerge r

0 commit comments

Comments
 (0)