Skip to content

Commit 361f4dc

Browse files
authored
Merge pull request #426 from IntersectMBO/jdral/scheduled-merges
Base implementation of scheduled merges
2 parents 1678933 + 27107f3 commit 361f4dc

File tree

14 files changed

+378
-64
lines changed

14 files changed

+378
-64
lines changed

src-control/Control/RefCount.hs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Control.RefCount (
1010
, mkRefCounter1
1111
, addReference
1212
, removeReference
13+
, removeReferenceN
1314
, upgradeWeakReference
1415
, readRefCount
1516
) where
@@ -21,7 +22,9 @@ import Control.Monad.Class.MonadThrow
2122
import Control.Monad.Primitive
2223
import Data.Maybe
2324
import Data.Primitive.PrimVar
25+
import Data.Word
2426
import GHC.Stack
27+
import Text.Printf
2528

2629
-- | A reference counter with an optional finaliser action. Once the reference
2730
-- count reaches @0@, the finaliser will be run.
@@ -89,6 +92,43 @@ removeReference RefCounter{countVar, finaliser} = mask_ $ do
8992
assertWithCallStack (prevCount > 0) $ pure ()
9093
when (prevCount == 1) $ sequence_ finaliser
9194

95+
-- TODO: remove uses of this API. Eventually all references should be singular,
96+
-- and not use patterns where if A contains B then N references on A becomes N
97+
-- references on B. Instead this should be a single reference from A to B,
98+
-- irrespective of the number of references to A.
99+
{-# SPECIALISE removeReferenceN :: HasCallStack => RefCounter IO -> Word64 -> IO () #-}
100+
-- | Decrease the reference counter by @n@. @n@ must be a positive number.
101+
--
102+
-- The count must be known (from context) to be non-zero and at least as large
103+
-- as @n@. Typically this will be because the caller has @n@ references already
104+
-- (that they took out themselves or were given).
105+
removeReferenceN :: (HasCallStack, PrimMonad m, MonadMask m) => RefCounter m -> Word64 -> m ()
106+
removeReferenceN RefCounter{countVar, finaliser} n = mask_ $ do
107+
-- n should be positive
108+
assert (n > 0) $ pure ()
109+
let !n' = fromIntegralChecked n
110+
prevCount <- fetchSubInt countVar n'
111+
-- the reference count must not already be 0, because then the finaliser
112+
-- will have run already
113+
assertWithCallStack (prevCount > 0) $ pure ()
114+
-- the reference count can not go below zero
115+
assertWithCallStack (prevCount >= n') $ pure ()
116+
when (prevCount <= n') $ sequence_ finaliser
117+
118+
-- TODO: remove when removeReferenceN is removed
119+
{-# INLINABLE fromIntegralChecked #-}
120+
-- | Like 'fromIntegral', but throws an error when @(x :: a) /= fromIntegral
121+
-- (fromIntegral x :: b)@.
122+
fromIntegralChecked :: (HasCallStack, Integral a, Integral b, Show a) => a -> b
123+
fromIntegralChecked x
124+
| x'' == x
125+
= x'
126+
| otherwise
127+
= error $ printf "fromIntegralChecked: conversion failed, %s /= %s" (show x) (show x'')
128+
where
129+
x' = fromIntegral x
130+
x'' = fromIntegral x'
131+
92132
-- | Try to turn a \"weak\" reference on something into a proper reference.
93133
-- This is by analogy with @deRefWeak :: Weak v -> IO (Maybe v)@, but for
94134
-- reference counts.
@@ -120,6 +160,7 @@ upgradeWeakReference RefCounter{countVar} = do
120160
then return True
121161
else casLoop prevCount'
122162

163+
-- TODO: remove when removeRefenceN is removed
123164
{-# SPECIALISE readRefCount :: RefCounter IO -> IO RefCount #-}
124165
-- | Warning: reading the current reference count is inherently racy as there is
125166
-- no way to reliably act on the information. It can be useful for debugging.

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,12 @@ deriving stock instance Generic (MergingRunState m h)
277277
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
278278
=> NoThunks (MergingRunState m h)
279279

280+
deriving stock instance Generic MergePolicyForLevel
281+
deriving anyclass instance NoThunks MergePolicyForLevel
282+
283+
deriving stock instance Generic NumRuns
284+
deriving anyclass instance NoThunks NumRuns
285+
280286
{-------------------------------------------------------------------------------
281287
Entry
282288
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal.hs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
9696
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
9797
import Database.LSMTree.Internal.Config
9898
import qualified Database.LSMTree.Internal.Cursor as Cursor
99-
import Database.LSMTree.Internal.Entry (Entry)
99+
import Database.LSMTree.Internal.Entry (Entry, unNumEntries)
100100
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
101101
ResolveSerialisedValue, lookupsIO)
102102
import Database.LSMTree.Internal.MergeSchedule
@@ -1108,6 +1108,12 @@ snapshot resolve snap label th = do
11081108
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
11091109
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
11101110
$ \reg content -> do
1111+
-- TODO: When we flush the buffer here, it might be underfull, which
1112+
-- could mess up the scheduling. The conservative approach is to supply
1113+
-- credits as if the buffer was full, and then flush the (possibly)
1114+
-- underfull buffer. However, note that this bit of code
1115+
-- here is probably going to change anyway because of #392
1116+
supplyCredits (unNumEntries $ case confWriteBufferAlloc conf of AllocNumEntries x -> x) (tableLevels content)
11111117
content' <- flushWriteBuffer
11121118
(TraceMerge `contramap` tableTracer th)
11131119
conf
@@ -1140,6 +1146,7 @@ snapshot resolve snap label th = do
11401146
-> SnapshotLabel
11411147
-> TableConfigOverride
11421148
-> SnapshotName
1149+
-> ResolveSerialisedValue
11431150
-> IO (TableHandle IO h) #-}
11441151
-- | See 'Database.LSMTree.Normal.open'.
11451152
open ::
@@ -1148,8 +1155,9 @@ open ::
11481155
-> SnapshotLabel -- ^ Expected label
11491156
-> TableConfigOverride -- ^ Optional config override
11501157
-> SnapshotName
1158+
-> ResolveSerialisedValue
11511159
-> m (TableHandle m h)
1152-
open sesh label override snap = do
1160+
open sesh label override snap resolve = do
11531161
traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override
11541162
withOpenSession sesh $ \seshEnv -> do
11551163
withTempRegistry $ \reg -> do
@@ -1173,7 +1181,7 @@ open sesh label override snap = do
11731181
<- allocateTemp reg
11741182
(WBB.new hfs blobpath)
11751183
WBB.removeReference
1176-
tableLevels <- openLevels reg hfs hbio conf (sessionRoot seshEnv) snappedLevels
1184+
tableLevels <- openLevels reg hfs hbio conf (sessionUniqCounter seshEnv) (sessionRoot seshEnv) resolve snappedLevels
11771185
tableCache <- mkLevelsCache reg tableLevels
11781186
newWith reg sesh seshEnv conf' am $! TableContent {
11791187
tableWriteBuffer = WB.empty

src/Database/LSMTree/Internal/Config.hs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,6 @@ instance NFData MergeSchedule where
409409
-- | The default 'MergeSchedule'.
410410
--
411411
-- >>> defaultMergeSchedule
412-
-- OneShot
413-
--
414-
-- TODO: replace by 'Incremental'
412+
-- Incremental
415413
defaultMergeSchedule :: MergeSchedule
416-
defaultMergeSchedule = OneShot
414+
defaultMergeSchedule = Incremental

src/Database/LSMTree/Internal/Merge.hs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ module Database.LSMTree.Internal.Merge (
99
, new
1010
, addReference
1111
, removeReference
12+
, removeReferenceN
13+
, readRefCount
1214
, complete
1315
, stepsToCompletion
1416
, stepsToCompletionCounted
@@ -24,12 +26,13 @@ import Control.Monad.Class.MonadThrow (MonadCatch, MonadMask (..),
2426
MonadThrow (..))
2527
import Control.Monad.Fix (MonadFix)
2628
import Control.Monad.Primitive (PrimMonad, PrimState, RealWorld)
27-
import Control.RefCount (RefCounter)
29+
import Control.RefCount (RefCount (..), RefCounter)
2830
import qualified Control.RefCount as RC
2931
import Data.Coerce (coerce)
3032
import Data.Primitive.MutVar
3133
import Data.Traversable (for)
3234
import qualified Data.Vector as V
35+
import Data.Word
3336
import Database.LSMTree.Internal.BlobRef (BlobRef)
3437
import Database.LSMTree.Internal.Entry
3538
import Database.LSMTree.Internal.Run (Run, RunDataCaching)
@@ -130,6 +133,14 @@ addReference Merge{..} = RC.addReference mergeRefCounter
130133
removeReference :: (HasCallStack, PrimMonad m, MonadMask m) => Merge m h -> m ()
131134
removeReference Merge{..} = RC.removeReference mergeRefCounter
132135

136+
{-# SPECIALISE removeReferenceN :: Merge IO h -> Word64 -> IO () #-}
137+
removeReferenceN :: (HasCallStack, PrimMonad m, MonadMask m) => Merge m h -> Word64 -> m ()
138+
removeReferenceN r = RC.removeReferenceN (mergeRefCounter r)
139+
140+
{-# SPECIALISE readRefCount :: Merge IO h -> IO RefCount #-}
141+
readRefCount :: PrimMonad m => Merge m h -> m RefCount
142+
readRefCount Merge{..} = RC.readRefCount mergeRefCounter
143+
133144
{-# SPECIALISE finaliser ::
134145
MutVar RealWorld MergeState
135146
-> RunBuilder IO h
@@ -239,6 +250,7 @@ stepsToCompletionCounted m stepBatchSize = go 0
239250
in (stepsSum',) <$> complete m
240251

241252
data StepResult = MergeInProgress | MergeComplete
253+
deriving stock Eq
242254

243255
stepsInvariant :: Int -> (Int, StepResult) -> Bool
244256
stepsInvariant requestedSteps = \case

0 commit comments

Comments
 (0)