Skip to content

Commit 008dbce

Browse files
authored
Merge pull request #633 from IntersectMBO/mheinzel/union-debt
Implement remainingUnionDebt and supplyUnionCredits
2 parents 04e58c7 + f081fb1 commit 008dbce

File tree

21 files changed

+631
-187
lines changed

21 files changed

+631
-187
lines changed

bench/macro/lsm-tree-bench-lookups.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ lookupsEnv runSizes keyRng0 hfs hbio caching = do
377377
putStr "DONE"
378378

379379
-- return runs
380-
runs <- V.fromList <$> mapM Run.fromMutable rbs
380+
runs <- V.fromList <$> mapM Run.fromBuilder rbs
381381
let blooms = V.map (\(DeRef r) -> Run.runFilter r) runs
382382
indexes = V.map (\(DeRef r) -> Run.runIndex r) runs
383383
handles = V.map (\(DeRef r) -> Run.runKOpsFile r) runs

bench/micro/Bench/Database/LSMTree/Internal/Index.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import Control.DeepSeq (rnf)
77
import Control.Monad.ST.Strict (runST)
88
import Criterion.Main (Benchmark, Benchmarkable, bench, bgroup, env,
99
whnf)
10-
#if __GLASGOW_HASKELL__ < 910
10+
#if !MIN_VERSION_base(4,20,0)
1111
import Data.List (foldl')
12+
-- foldl' is included in the Prelude from base 4.20 onwards
1213
#endif
1314
import Database.LSMTree.Extras.Generators (getKeyForIndexCompact,
1415
mkPages, toAppends)

prototypes/ScheduledMerges.hs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,8 +1371,17 @@ remainingDebtMergingTree :: MergingTree s -> ST s (Debt, Size)
13711371
remainingDebtMergingTree (MergingTree ref) =
13721372
readSTRef ref >>= \case
13731373
CompletedTreeMerge r -> return (0, runSize r)
1374-
OngoingTreeMerge mr -> remainingDebtMergingRun mr
1375-
PendingTreeMerge pm -> remainingDebtPendingMerge pm
1374+
OngoingTreeMerge mr -> addDebtOne <$> remainingDebtMergingRun mr
1375+
PendingTreeMerge pm -> addDebtOne <$> remainingDebtPendingMerge pm
1376+
where
1377+
-- An ongoing merge should never have 0 debt, even if the 'MergingRun' in it
1378+
-- says it is completed. We still need to update it to 'CompletedTreeMerge'.
1379+
-- Similarly, a pending merge needs some work to complete it, even if all
1380+
-- its inputs are empty.
1381+
--
1382+
-- Note that we can't use @max 1@, as this would violate the property that
1383+
-- supplying N credits reduces the remaining debt by at least N.
1384+
addDebtOne (debt, size) = (debt + 1, size)
13761385

13771386
remainingDebtPendingMerge :: PendingMerge s -> ST s (Debt, Size)
13781387
remainingDebtPendingMerge (PendingMerge _ prs trees) = do
@@ -1381,11 +1390,7 @@ remainingDebtPendingMerge (PendingMerge _ prs trees) = do
13811390
, traverse remainingDebtMergingTree trees
13821391
]
13831392
let totalSize = sum sizes
1384-
-- A pending merge should never have 0 remaining debt. It needs some work to
1385-
-- complete it, even if all its inputs are empty. It's not enought to use
1386-
-- @max 1@, as this would violate the property that supplying N credits
1387-
-- reduces the remaining debt by at least N.
1388-
let totalDebt = sum debts + totalSize + 1
1393+
let totalDebt = sum debts + totalSize
13891394
return (totalDebt, totalSize)
13901395
where
13911396
remainingDebtPreExistingRun = \case
@@ -1478,8 +1483,9 @@ supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits -> \cas
14781483
-- approximately equal, being more precise would require more iterations
14791484
splitEqually :: (Credit -> a -> ST s Credit) -> [a] -> Credit -> ST s Credit
14801485
splitEqually f xs credits =
1481-
-- first give each tree k = ceil(1/n) credits (last ones might get less)
1482-
-- any remainders go left to right
1486+
-- first give each tree k = ceil(1/n) credits (last ones might get less).
1487+
-- it's important we fold here to collect leftovers.
1488+
-- any remainders go left to right.
14831489
foldM supply credits xs >>= leftToRight f xs
14841490
where
14851491
!n = length xs

src-extras/Database/LSMTree/Extras/RunData.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ unsafeCreateRunAt ::
149149
-> SerialisedRunData
150150
-> IO (Ref (Run IO h))
151151
unsafeCreateRunAt fs hbio runParams fsPaths (RunData m) = do
152+
-- the WBB file path doesn't have to be at a specific place relative to
153+
-- the run we want to create, but fsPaths should already point to a unique
154+
-- location, so we just append something to not conflict with that.
152155
let blobpath = FS.addExtension (runBlobPath fsPaths) ".wb"
153156
bracket (WBB.new fs blobpath) releaseRef $ \wbblobs -> do
154157
wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob fs wbblobs)) m

src/Database/LSMTree.hs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,15 +564,19 @@ remainingUnionDebt (Internal.Table' t) =
564564
(\(Internal.UnionDebt x) -> UnionDebt x) <$>
565565
Internal.remainingUnionDebt t
566566

567-
{-# SPECIALISE supplyUnionCredits :: Table IO k v b -> UnionCredits -> IO UnionCredits #-}
567+
{-# SPECIALISE supplyUnionCredits ::
568+
ResolveValue v => Table IO k v b -> UnionCredits -> IO UnionCredits #-}
568569
supplyUnionCredits ::
569-
IOLike m
570+
forall m k v b. (IOLike m, ResolveValue v)
570571
=> Table m k v b
571572
-> UnionCredits
572573
-> m UnionCredits
573574
supplyUnionCredits (Internal.Table' t) (UnionCredits credits) =
574575
(\(Internal.UnionCredits x) -> UnionCredits x) <$>
575-
Internal.supplyUnionCredits t (Internal.UnionCredits credits)
576+
Internal.supplyUnionCredits
577+
(resolve (Proxy @v))
578+
t
579+
(Internal.UnionCredits credits)
576580

577581
{-------------------------------------------------------------------------------
578582
Monoidal value resolution

src/Database/LSMTree/Internal.hs

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,15 @@ import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
105105
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
106106
import Database.LSMTree.Internal.Config
107107
import qualified Database.LSMTree.Internal.Cursor as Cursor
108-
import Database.LSMTree.Internal.Entry (Entry)
108+
import Database.LSMTree.Internal.Entry (Entry, NumEntries (..))
109109
import Database.LSMTree.Internal.IncomingRun (IncomingRun (..))
110110
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
111111
ResolveSerialisedValue, lookupsIO,
112112
lookupsIOWithoutWriteBuffer)
113113
import Database.LSMTree.Internal.MergeSchedule
114114
import qualified Database.LSMTree.Internal.MergingRun as MR
115115
import Database.LSMTree.Internal.MergingTree
116+
import qualified Database.LSMTree.Internal.MergingTree as MT
116117
import qualified Database.LSMTree.Internal.MergingTree.Lookup as MT
117118
import Database.LSMTree.Internal.Paths (SessionRoot (..),
118119
SnapshotMetaDataChecksumFile (..),
@@ -1626,32 +1627,56 @@ newtype UnionDebt = UnionDebt Int
16261627

16271628
{-# SPECIALISE remainingUnionDebt :: Table IO h -> IO UnionDebt #-}
16281629
-- | See 'Database.LSMTree.Normal.remainingUnionDebt'.
1629-
remainingUnionDebt :: (MonadSTM m, MonadThrow m) => Table m h -> m UnionDebt
1630+
remainingUnionDebt ::
1631+
(MonadSTM m, MonadMVar m, MonadThrow m, PrimMonad m)
1632+
=> Table m h -> m UnionDebt
16301633
remainingUnionDebt t = do
16311634
traceWith (tableTracer t) TraceRemainingUnionDebt
16321635
withOpenTable t $ \tEnv -> do
1633-
RW.withReadAccess (tableContent tEnv) $ \tableContent ->
1636+
RW.withReadAccess (tableContent tEnv) $ \tableContent -> do
16341637
case tableUnionLevel tableContent of
1635-
NoUnion -> pure (UnionDebt 0)
1636-
Union{} -> error "remainingUnionDebt: not yet implemented"
1638+
NoUnion ->
1639+
pure (UnionDebt 0)
1640+
Union mt -> do
1641+
(MergeDebt (MergeCredits c), _) <- MT.remainingMergeDebt mt
1642+
pure (UnionDebt c)
16371643

16381644
-- | See 'Database.LSMTree.Normal.UnionCredits'.
16391645
newtype UnionCredits = UnionCredits Int
16401646
deriving newtype (Show, Eq, Ord, Num)
16411647

1642-
{-# SPECIALISE supplyUnionCredits :: Table IO h -> UnionCredits -> IO UnionCredits #-}
1648+
{-# SPECIALISE supplyUnionCredits ::
1649+
ResolveSerialisedValue -> Table IO h -> UnionCredits -> IO UnionCredits #-}
16431650
-- | See 'Database.LSMTree.Normal.supplyUnionCredits'.
1644-
supplyUnionCredits :: (MonadSTM m, MonadCatch m) => Table m h -> UnionCredits -> m UnionCredits
1645-
supplyUnionCredits t credits = do
1651+
supplyUnionCredits ::
1652+
(MonadST m, MonadSTM m, MonadMVar m, MonadMask m)
1653+
=> ResolveSerialisedValue -> Table m h -> UnionCredits -> m UnionCredits
1654+
supplyUnionCredits resolve t credits = do
16461655
traceWith (tableTracer t) $ TraceSupplyUnionCredits credits
16471656
withOpenTable t $ \tEnv -> do
1648-
-- TODO: should this be acquiring read or write access?
1649-
RW.withWriteAccess (tableContent tEnv) $ \tableContent ->
1657+
-- No need to mutate the table content here. In the rare case that we want
1658+
-- to move a completed union level into the regular levels, we can still
1659+
-- take the write lock for that.
1660+
RW.withReadAccess (tableContent tEnv) $ \tableContent -> do
16501661
case tableUnionLevel tableContent of
1651-
NoUnion -> pure (tableContent, credits) -- all leftovers
1652-
Union{}
1653-
| credits <= UnionCredits 0 -> pure (tableContent, UnionCredits 0)
1654-
--TODO: remove this 0 special case once the general case covers it.
1655-
-- We do not need to optimise the 0 case. It is just here to
1656-
-- simplify test coverage.
1657-
| otherwise -> error "supplyUnionCredits: not yet implemented"
1662+
NoUnion ->
1663+
pure (max 0 credits) -- all leftovers (but never negative)
1664+
Union mt -> do
1665+
let conf = tableConfig t
1666+
let AllocNumEntries (NumEntries x) = confWriteBufferAlloc conf
1667+
-- We simply use the write buffer size as merge credit threshold, as
1668+
-- the regular level merges also do.
1669+
-- TODO: pick a more suitable threshold or make configurable?
1670+
let thresh = MR.CreditThreshold (MR.UnspentCredits (MergeCredits x))
1671+
MergeCredits leftovers <-
1672+
MT.supplyCredits
1673+
(tableHasFS tEnv)
1674+
(tableHasBlockIO tEnv)
1675+
resolve
1676+
(runParamsForLevel conf UnionLevel)
1677+
thresh
1678+
(tableSessionRoot tEnv)
1679+
(tableSessionUniqCounter tEnv)
1680+
mt
1681+
(let UnionCredits c = credits in MergeCredits c)
1682+
pure (UnionCredits leftovers)

src/Database/LSMTree/Internal/Config.hs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module Database.LSMTree.Internal.Config (
33
-- * Table configuration
44
, TableConfig (..)
55
, defaultTableConfig
6+
, RunLevelNo (..)
7+
, runParamsForLevel
68
-- * Table configuration override
79
, TableConfigOverride
810
, applyOverride
@@ -34,16 +36,16 @@ import Control.DeepSeq (NFData (..))
3436
import Data.Maybe (fromMaybe)
3537
import Data.Monoid (Last (..))
3638
import Data.Word (Word64)
37-
import Database.LSMTree.Internal.Assertions (assert)
3839
import Database.LSMTree.Internal.Entry (NumEntries (..))
3940
import Database.LSMTree.Internal.Index (IndexType)
4041
import qualified Database.LSMTree.Internal.Index as Index
4142
(IndexType (Compact, Ordinary))
4243
import Database.LSMTree.Internal.Run (RunDataCaching (..))
4344
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
45+
import Database.LSMTree.Internal.RunBuilder (RunParams (..))
4446

4547
newtype LevelNo = LevelNo Int
46-
deriving stock (Show, Eq)
48+
deriving stock (Show, Eq, Ord)
4749
deriving newtype (Enum, NFData)
4850

4951
{-------------------------------------------------------------------------------
@@ -95,6 +97,16 @@ defaultTableConfig =
9597
, confMergeSchedule = defaultMergeSchedule
9698
}
9799

100+
data RunLevelNo = RegularLevel LevelNo | UnionLevel
101+
102+
runParamsForLevel :: TableConfig -> RunLevelNo -> RunParams
103+
runParamsForLevel conf@TableConfig {..} levelNo =
104+
RunParams
105+
{ runParamCaching = diskCachePolicyForLevel confDiskCachePolicy levelNo
106+
, runParamAlloc = bloomFilterAllocForLevel conf levelNo
107+
, runParamIndex = indexTypeForRun confFencePointerIndex
108+
}
109+
98110
{-------------------------------------------------------------------------------
99111
Table configuration override
100112
-------------------------------------------------------------------------------}
@@ -227,9 +239,8 @@ instance NFData BloomFilterAlloc where
227239
defaultBloomFilterAlloc :: BloomFilterAlloc
228240
defaultBloomFilterAlloc = AllocFixed 10
229241

230-
bloomFilterAllocForLevel :: TableConfig -> LevelNo -> RunBloomFilterAlloc
231-
bloomFilterAllocForLevel conf (LevelNo l) =
232-
assert (l > 0) $
242+
bloomFilterAllocForLevel :: TableConfig -> RunLevelNo -> RunBloomFilterAlloc
243+
bloomFilterAllocForLevel conf _levelNo =
233244
case confBloomFilterAlloc conf of
234245
AllocFixed n -> RunAllocFixed n
235246
AllocRequestFPR fpr -> RunAllocRequestFPR fpr
@@ -326,15 +337,16 @@ instance NFData DiskCachePolicy where
326337
-- | Interpret the 'DiskCachePolicy' for a level: should we cache data in runs
327338
-- at this level.
328339
--
329-
diskCachePolicyForLevel :: DiskCachePolicy -> LevelNo -> RunDataCaching
330-
diskCachePolicyForLevel policy (LevelNo ln) =
340+
diskCachePolicyForLevel :: DiskCachePolicy -> RunLevelNo -> RunDataCaching
341+
diskCachePolicyForLevel policy levelNo =
331342
case policy of
332-
DiskCacheAll -> CacheRunData
333-
DiskCacheNone -> NoCacheRunData
334-
DiskCacheLevelsAtOrBelow n
335-
| ln <= n -> CacheRunData
336-
| otherwise -> NoCacheRunData
337-
343+
DiskCacheAll -> CacheRunData
344+
DiskCacheNone -> NoCacheRunData
345+
DiskCacheLevelsAtOrBelow n ->
346+
case levelNo of
347+
RegularLevel l | l <= LevelNo n -> CacheRunData
348+
| otherwise -> NoCacheRunData
349+
UnionLevel -> NoCacheRunData
338350

339351
{-------------------------------------------------------------------------------
340352
Merge schedule

src/Database/LSMTree/Internal/Merge.hs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ new ::
174174
new hfs hbio runParams mergeType mergeMappend targetPaths runs = do
175175
-- no offset, no write buffer
176176
mreaders <- Readers.new Readers.NoOffsetKey Nothing runs
177+
-- TODO: Exception safety! If Readers.new fails after already creating some
178+
-- run readers, or Builder.new fails, the run readers will stay open,
179+
-- holding handles of the input runs' files.
177180
for mreaders $ \mergeReaders -> do
178181
-- calculate upper bounds based on input runs
179182
let numEntries = V.foldMap' Run.size runs
@@ -236,7 +239,7 @@ complete Merge{..} = do
236239
Merging -> error "complete: Merge is not done"
237240
MergingDone -> do
238241
-- the readers are already drained, therefore closed
239-
r <- Run.fromMutable mergeBuilder
242+
r <- Run.fromBuilder mergeBuilder
240243
writeMutVar mergeState $! Completed
241244
pure r
242245
Completed -> error "complete: Merge is already completed"

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -762,18 +762,10 @@ mergingRunParamsForLevel ::
762762
-> Unique
763763
-> LevelNo
764764
-> (RunParams, RunFsPaths)
765-
mergingRunParamsForLevel dir
766-
conf@TableConfig {
767-
confDiskCachePolicy,
768-
confFencePointerIndex
769-
}
770-
unique ln =
771-
(RunParams {..}, runPaths)
765+
mergingRunParamsForLevel dir conf unique ln =
766+
(runParamsForLevel conf (RegularLevel ln), runPaths)
772767
where
773-
!runParamCaching = diskCachePolicyForLevel confDiskCachePolicy ln
774-
!runParamAlloc = bloomFilterAllocForLevel conf ln
775-
!runParamIndex = indexTypeForRun confFencePointerIndex
776-
!runPaths = Paths.runPath dir (uniqueToRunNumber unique)
768+
!runPaths = Paths.runPath dir (uniqueToRunNumber unique)
777769

778770
-- | We use levelling on the last level, unless that is also the first level.
779771
mergePolicyForLevel ::

0 commit comments

Comments
 (0)