Skip to content

Commit 8a0006c

Browse files
committed
implement supplyUnionCredits
1 parent 49028c5 commit 8a0006c

File tree

11 files changed

+284
-83
lines changed

11 files changed

+284
-83
lines changed

bench/micro/Bench/Database/LSMTree/Internal/Index.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import Control.DeepSeq (rnf)
77
import Control.Monad.ST.Strict (runST)
88
import Criterion.Main (Benchmark, Benchmarkable, bench, bgroup, env,
99
whnf)
10-
#if __GLASGOW_HASKELL__ < 910
10+
#if !MIN_VERSION_base(4,20,0)
1111
import Data.List (foldl')
12+
-- foldl' is included in the Prelude from base 4.20 onwards
1213
#endif
1314
import Database.LSMTree.Extras.Generators (getKeyForIndexCompact,
1415
mkPages, toAppends)

prototypes/ScheduledMerges.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,8 +1483,9 @@ supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits -> \cas
14831483
-- approximately equal, being more precise would require more iterations
14841484
splitEqually :: (Credit -> a -> ST s Credit) -> [a] -> Credit -> ST s Credit
14851485
splitEqually f xs credits =
1486-
-- first give each tree k = ceil(1/n) credits (last ones might get less)
1487-
-- any remainders go left to right
1486+
-- first give each tree k = ceil(1/n) credits (last ones might get less).
1487+
-- it's important we fold here to collect leftovers.
1488+
-- any remainders go left to right.
14881489
foldM supply credits xs >>= leftToRight f xs
14891490
where
14901491
!n = length xs

src/Database/LSMTree.hs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,15 +564,19 @@ remainingUnionDebt (Internal.Table' t) =
564564
(\(Internal.UnionDebt x) -> UnionDebt x) <$>
565565
Internal.remainingUnionDebt t
566566

567-
{-# SPECIALISE supplyUnionCredits :: Table IO k v b -> UnionCredits -> IO UnionCredits #-}
567+
{-# SPECIALISE supplyUnionCredits ::
568+
ResolveValue v => Table IO k v b -> UnionCredits -> IO UnionCredits #-}
568569
supplyUnionCredits ::
569-
IOLike m
570+
forall m k v b. (IOLike m, ResolveValue v)
570571
=> Table m k v b
571572
-> UnionCredits
572573
-> m UnionCredits
573574
supplyUnionCredits (Internal.Table' t) (UnionCredits credits) =
574575
(\(Internal.UnionCredits x) -> UnionCredits x) <$>
575-
Internal.supplyUnionCredits t (Internal.UnionCredits credits)
576+
Internal.supplyUnionCredits
577+
(resolve (Proxy @v))
578+
t
579+
(Internal.UnionCredits credits)
576580

577581
{-------------------------------------------------------------------------------
578582
Monoidal value resolution

src/Database/LSMTree/Internal.hs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
105105
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
106106
import Database.LSMTree.Internal.Config
107107
import qualified Database.LSMTree.Internal.Cursor as Cursor
108-
import Database.LSMTree.Internal.Entry (Entry)
108+
import Database.LSMTree.Internal.Entry (Entry, NumEntries (..))
109109
import Database.LSMTree.Internal.IncomingRun (IncomingRun (..))
110110
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
111111
ResolveSerialisedValue, lookupsIO,
@@ -1645,19 +1645,38 @@ remainingUnionDebt t = do
16451645
newtype UnionCredits = UnionCredits Int
16461646
deriving newtype (Show, Eq, Ord, Num)
16471647

1648-
{-# SPECIALISE supplyUnionCredits :: Table IO h -> UnionCredits -> IO UnionCredits #-}
1648+
{-# SPECIALISE supplyUnionCredits ::
1649+
ResolveSerialisedValue -> Table IO h -> UnionCredits -> IO UnionCredits #-}
16491650
-- | See 'Database.LSMTree.Normal.supplyUnionCredits'.
1650-
supplyUnionCredits :: (MonadSTM m, MonadCatch m) => Table m h -> UnionCredits -> m UnionCredits
1651-
supplyUnionCredits t credits = do
1651+
supplyUnionCredits ::
1652+
(MonadST m, MonadSTM m, MonadMVar m, MonadMask m)
1653+
=> ResolveSerialisedValue -> Table m h -> UnionCredits -> m UnionCredits
1654+
supplyUnionCredits resolve t credits = do
16521655
traceWith (tableTracer t) $ TraceSupplyUnionCredits credits
16531656
withOpenTable t $ \tEnv -> do
1654-
-- TODO: should this be acquiring read or write access?
1655-
RW.withWriteAccess (tableContent tEnv) $ \tableContent ->
1657+
-- No need to mutate the table content here. In the rare case that we want
1658+
-- to move a completed union level into the regular levels, we can still
1659+
-- take the write lock for that.
1660+
RW.withReadAccess (tableContent tEnv) $ \tableContent -> do
16561661
case tableUnionLevel tableContent of
1657-
NoUnion -> pure (tableContent, credits) -- all leftovers
1658-
Union{}
1659-
| credits <= UnionCredits 0 -> pure (tableContent, UnionCredits 0)
1660-
--TODO: remove this 0 special case once the general case covers it.
1661-
-- We do not need to optimise the 0 case. It is just here to
1662-
-- simplify test coverage.
1663-
| otherwise -> error "supplyUnionCredits: not yet implemented"
1662+
NoUnion ->
1663+
pure (max 0 credits) -- all leftovers (but never negative)
1664+
Union mt -> do
1665+
let conf = tableConfig t
1666+
let AllocNumEntries (NumEntries x) = confWriteBufferAlloc conf
1667+
-- We simply use the write buffer size as merge credit threshold, as
1668+
-- the regular level merges also do.
1669+
-- TODO: pick a more suitable threshold or make configurable?
1670+
let thresh = MR.CreditThreshold (MR.UnspentCredits (MergeCredits x))
1671+
MergeCredits leftovers <-
1672+
MT.supplyCredits
1673+
(tableHasFS tEnv)
1674+
(tableHasBlockIO tEnv)
1675+
resolve
1676+
(runParamsForLevel conf UnionLevel)
1677+
thresh
1678+
(tableSessionRoot tEnv)
1679+
(tableSessionUniqCounter tEnv)
1680+
mt
1681+
(let UnionCredits c = credits in MergeCredits c)
1682+
pure (UnionCredits leftovers)

src/Database/LSMTree/Internal/Config.hs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module Database.LSMTree.Internal.Config (
33
-- * Table configuration
44
, TableConfig (..)
55
, defaultTableConfig
6+
, RunLevelNo (..)
7+
, runParamsForLevel
68
-- * Table configuration override
79
, TableConfigOverride
810
, applyOverride
@@ -34,16 +36,16 @@ import Control.DeepSeq (NFData (..))
3436
import Data.Maybe (fromMaybe)
3537
import Data.Monoid (Last (..))
3638
import Data.Word (Word64)
37-
import Database.LSMTree.Internal.Assertions (assert)
3839
import Database.LSMTree.Internal.Entry (NumEntries (..))
3940
import Database.LSMTree.Internal.Index (IndexType)
4041
import qualified Database.LSMTree.Internal.Index as Index
4142
(IndexType (Compact, Ordinary))
4243
import Database.LSMTree.Internal.Run (RunDataCaching (..))
4344
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
45+
import Database.LSMTree.Internal.RunBuilder (RunParams (..))
4446

4547
newtype LevelNo = LevelNo Int
46-
deriving stock (Show, Eq)
48+
deriving stock (Show, Eq, Ord)
4749
deriving newtype (Enum, NFData)
4850

4951
{-------------------------------------------------------------------------------
@@ -95,6 +97,16 @@ defaultTableConfig =
9597
, confMergeSchedule = defaultMergeSchedule
9698
}
9799

100+
data RunLevelNo = RegularLevel LevelNo | UnionLevel
101+
102+
runParamsForLevel :: TableConfig -> RunLevelNo -> RunParams
103+
runParamsForLevel conf@TableConfig {..} levelNo =
104+
RunParams
105+
{ runParamCaching = diskCachePolicyForLevel confDiskCachePolicy levelNo
106+
, runParamAlloc = bloomFilterAllocForLevel conf levelNo
107+
, runParamIndex = indexTypeForRun confFencePointerIndex
108+
}
109+
98110
{-------------------------------------------------------------------------------
99111
Table configuration override
100112
-------------------------------------------------------------------------------}
@@ -227,9 +239,8 @@ instance NFData BloomFilterAlloc where
227239
defaultBloomFilterAlloc :: BloomFilterAlloc
228240
defaultBloomFilterAlloc = AllocFixed 10
229241

230-
bloomFilterAllocForLevel :: TableConfig -> LevelNo -> RunBloomFilterAlloc
231-
bloomFilterAllocForLevel conf (LevelNo l) =
232-
assert (l > 0) $
242+
bloomFilterAllocForLevel :: TableConfig -> RunLevelNo -> RunBloomFilterAlloc
243+
bloomFilterAllocForLevel conf _levelNo =
233244
case confBloomFilterAlloc conf of
234245
AllocFixed n -> RunAllocFixed n
235246
AllocRequestFPR fpr -> RunAllocRequestFPR fpr
@@ -326,15 +337,16 @@ instance NFData DiskCachePolicy where
326337
-- | Interpret the 'DiskCachePolicy' for a level: should we cache data in runs
327338
-- at this level.
328339
--
329-
diskCachePolicyForLevel :: DiskCachePolicy -> LevelNo -> RunDataCaching
330-
diskCachePolicyForLevel policy (LevelNo ln) =
340+
diskCachePolicyForLevel :: DiskCachePolicy -> RunLevelNo -> RunDataCaching
341+
diskCachePolicyForLevel policy levelNo =
331342
case policy of
332-
DiskCacheAll -> CacheRunData
333-
DiskCacheNone -> NoCacheRunData
334-
DiskCacheLevelsAtOrBelow n
335-
| ln <= n -> CacheRunData
336-
| otherwise -> NoCacheRunData
337-
343+
DiskCacheAll -> CacheRunData
344+
DiskCacheNone -> NoCacheRunData
345+
DiskCacheLevelsAtOrBelow n ->
346+
case levelNo of
347+
RegularLevel l | l <= LevelNo n -> CacheRunData
348+
| otherwise -> NoCacheRunData
349+
UnionLevel -> NoCacheRunData
338350

339351
{-------------------------------------------------------------------------------
340352
Merge schedule

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -762,18 +762,10 @@ mergingRunParamsForLevel ::
762762
-> Unique
763763
-> LevelNo
764764
-> (RunParams, RunFsPaths)
765-
mergingRunParamsForLevel dir
766-
conf@TableConfig {
767-
confDiskCachePolicy,
768-
confFencePointerIndex
769-
}
770-
unique ln =
771-
(RunParams {..}, runPaths)
765+
mergingRunParamsForLevel dir conf unique ln =
766+
(runParamsForLevel conf (RegularLevel ln), runPaths)
772767
where
773-
!runParamCaching = diskCachePolicyForLevel confDiskCachePolicy ln
774-
!runParamAlloc = bloomFilterAllocForLevel conf ln
775-
!runParamIndex = indexTypeForRun confFencePointerIndex
776-
!runPaths = Paths.runPath dir (uniqueToRunNumber unique)
768+
!runPaths = Paths.runPath dir (uniqueToRunNumber unique)
777769

778770
-- | We use levelling on the last level, unless that is also the first level.
779771
mergePolicyForLevel ::

0 commit comments

Comments
 (0)