|
| 1 | +{-# LANGUAGE CPP #-} |
| 2 | +{-# LANGUAGE MagicHash #-} |
| 3 | +{-# LANGUAGE UnboxedTuples #-} |
| 4 | + |
| 5 | +#if !(MIN_VERSION_GLASGOW_HASKELL(9,0,0,0)) |
| 6 | +-- Fix for ghc 8.10.x with deriving newtype Prim |
| 7 | +{-# LANGUAGE DataKinds #-} |
| 8 | +#endif |
| 9 | + |
| 10 | +module Database.LSMTree.Internal.IncomingRun ( |
| 11 | + IncomingRun (..) |
| 12 | + , MergePolicyForLevel (..) |
| 13 | + , duplicateIncomingRun |
| 14 | + , releaseIncomingRun |
| 15 | + , newIncomingSingleRun |
| 16 | + , newIncomingMergingRun |
| 17 | + , snapshotIncomingRun |
| 18 | + |
| 19 | + -- * Credits and credit tracking |
| 20 | + -- $credittracking |
| 21 | + , NominalDebt (..) |
| 22 | + , NominalCredits (..) |
| 23 | + , nominalDebtAsCredits |
| 24 | + , supplyCreditsIncomingRun |
| 25 | + , immediatelyCompleteIncomingRun |
| 26 | + ) where |
| 27 | + |
| 28 | +import Control.ActionRegistry |
| 29 | +import Control.Concurrent.Class.MonadMVar.Strict |
| 30 | +import Control.DeepSeq (NFData (..)) |
| 31 | +import Control.Monad.Class.MonadST (MonadST) |
| 32 | +import Control.Monad.Class.MonadSTM (MonadSTM (..)) |
| 33 | +import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..)) |
| 34 | +import Control.Monad.Primitive |
| 35 | +import Control.RefCount |
| 36 | +import Data.Primitive (Prim) |
| 37 | +import Data.Primitive.PrimVar |
| 38 | +import Database.LSMTree.Internal.Assertions (assert) |
| 39 | +import Database.LSMTree.Internal.Config |
| 40 | +import Database.LSMTree.Internal.Entry (NumEntries (..)) |
| 41 | +import Database.LSMTree.Internal.MergingRun (MergeCredits (..), |
| 42 | + MergeDebt (..), MergingRun) |
| 43 | +import qualified Database.LSMTree.Internal.MergingRun as MR |
| 44 | +import Database.LSMTree.Internal.Run (Run) |
| 45 | + |
| 46 | +import GHC.Exts (Word (W#), quotRemWord2#, timesWord2#) |
| 47 | + |
| 48 | +{------------------------------------------------------------------------------- |
| 49 | + Incoming runs |
| 50 | +-------------------------------------------------------------------------------} |
| 51 | + |
| 52 | +-- | An incoming run is either a single run, or a merge. |
| 53 | +data IncomingRun m h = |
| 54 | + Single !(Ref (Run m h)) |
| 55 | + | Merging !MergePolicyForLevel |
| 56 | + !NominalDebt |
| 57 | + !(PrimVar (PrimState m) NominalCredits) |
| 58 | + !(Ref (MergingRun MR.LevelMergeType m h)) |
| 59 | + |
| 60 | +data MergePolicyForLevel = LevelTiering | LevelLevelling |
| 61 | + deriving stock (Show, Eq) |
| 62 | + |
| 63 | +instance NFData MergePolicyForLevel where |
| 64 | + rnf LevelTiering = () |
| 65 | + rnf LevelLevelling = () |
| 66 | + |
| 67 | +{-# SPECIALISE duplicateIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} |
| 68 | +duplicateIncomingRun :: |
| 69 | + (PrimMonad m, MonadMask m) |
| 70 | + => ActionRegistry m |
| 71 | + -> IncomingRun m h |
| 72 | + -> m (IncomingRun m h) |
| 73 | +duplicateIncomingRun reg (Single r) = |
| 74 | + Single <$> withRollback reg (dupRef r) releaseRef |
| 75 | + |
| 76 | +duplicateIncomingRun reg (Merging mp md mcv mr) = |
| 77 | + Merging mp md <$> (newPrimVar =<< readPrimVar mcv) |
| 78 | + <*> withRollback reg (dupRef mr) releaseRef |
| 79 | + |
| 80 | +{-# SPECIALISE releaseIncomingRun :: IncomingRun IO h -> IO () #-} |
| 81 | +releaseIncomingRun :: |
| 82 | + (PrimMonad m, MonadMask m) |
| 83 | + => IncomingRun m h -> m () |
| 84 | +releaseIncomingRun (Single r) = releaseRef r |
| 85 | +releaseIncomingRun (Merging _ _ _ mr) = releaseRef mr |
| 86 | + |
| 87 | +{-# INLINE newIncomingSingleRun #-} |
| 88 | +newIncomingSingleRun :: |
| 89 | + (PrimMonad m, MonadThrow m) |
| 90 | + => Ref (Run m h) |
| 91 | + -> m (IncomingRun m h) |
| 92 | +newIncomingSingleRun r = Single <$> dupRef r |
| 93 | + |
| 94 | +{-# INLINE newIncomingMergingRun #-} |
| 95 | +newIncomingMergingRun :: |
| 96 | + (PrimMonad m, MonadThrow m) |
| 97 | + => MergePolicyForLevel |
| 98 | + -> NominalDebt |
| 99 | + -> Ref (MergingRun MR.LevelMergeType m h) |
| 100 | + -> m (IncomingRun m h) |
| 101 | +newIncomingMergingRun mergePolicy nominalDebt mr = do |
| 102 | + nominalCreditsVar <- newPrimVar (NominalCredits 0) |
| 103 | + Merging mergePolicy nominalDebt nominalCreditsVar <$> dupRef mr |
| 104 | + |
| 105 | +{-# SPECIALISE snapshotIncomingRun :: |
| 106 | + IncomingRun IO h |
| 107 | + -> IO (Either (Ref (Run IO h)) |
| 108 | + (MergePolicyForLevel, |
| 109 | + NominalDebt, |
| 110 | + NominalCredits, |
| 111 | + Ref (MergingRun MR.LevelMergeType IO h))) #-} |
| 112 | +snapshotIncomingRun :: |
| 113 | + PrimMonad m |
| 114 | + => IncomingRun m h |
| 115 | + -> m (Either (Ref (Run m h)) |
| 116 | + (MergePolicyForLevel, |
| 117 | + NominalDebt, |
| 118 | + NominalCredits, |
| 119 | + Ref (MergingRun MR.LevelMergeType m h))) |
| 120 | +snapshotIncomingRun (Single r) = pure (Left r) |
| 121 | +snapshotIncomingRun (Merging mergePolicy nominalDebt nominalCreditsVar mr) = do |
| 122 | + nominalCredits <- readPrimVar nominalCreditsVar |
| 123 | + pure (Right (mergePolicy, nominalDebt, nominalCredits, mr)) |
| 124 | + |
| 125 | +{------------------------------------------------------------------------------- |
| 126 | + Credits |
| 127 | +-------------------------------------------------------------------------------} |
| 128 | + |
| 129 | +{- $credittracking |
| 130 | +
|
| 131 | +With scheduled merges, each update (e.g., insert) on a table contributes to the |
| 132 | +progression of ongoing merges in the levels structure. This ensures that merges |
| 133 | +are finished in time before a new merge has to be started. The points in the |
| 134 | +evolution of the levels structure where new merges are started are known: a |
| 135 | +flush of a full write buffer will create a new run on the first level, and |
| 136 | +after sufficient flushes (e.g., 4) we will start at least one new merge on the |
| 137 | +second level. This may cascade down to lower levels depending on how full the |
| 138 | +levels are. As such, we have a well-defined measure to determine when merges |
| 139 | +should be finished: it only depends on the maximum size of the write buffer! |
| 140 | +
|
| 141 | +The simplest solution to making sure merges are done in time is to step them to |
| 142 | +completion immediately when started. This does not, however, spread out work |
| 143 | +over time nicely. Instead, we schedule merge work based on how many updates are |
| 144 | +made on the table, taking care to ensure that the merge is finished /just/ in |
| 145 | +time before the next flush comes around, and not too early. |
| 146 | +
|
| 147 | +The progression is tracked using nominal credits. Each individual update |
| 148 | +contributes a single credit to each level, since each level contains precisely |
| 149 | +one ongoing merge. Contributing a credit does not, however, translate directly |
| 150 | +to performing one /unit/ of merging work: |
| 151 | +
|
| 152 | +* The amount of work to do for one credit is adjusted depending on the actual |
| 153 | + size of the merge we are doing. Last-level merges, for example, can have |
| 154 | + larger inputs, and therefore we have to do a little more work for each |
| 155 | + credit. Or input runs involved in a merge can be less than maximal size for |
| 156 | + the level, and so there may be less merging work to do. As such, we /scale/ |
| 157 | + 'NominalCredits' to 'MergeCredits', and then supply the 'MergeCredits' to |
| 158 | + the 'MergingRun'. |
| 159 | +
|
| 160 | +* Supplying 'MergeCredits' to a 'MergingRun' does not necessarily directly |
| 161 | + translate into performing merging work. Merge credits are accumulated until |
| 162 | + they go over a threshold, after which a batch of merge work will be performed. |
| 163 | + Configuring this threshold should allow a good balance between spreading out |
| 164 | + I\/O and achieving good (concurrent) performance. |
| 165 | +
|
| 166 | +Merging runs can be shared across tables, which means that multiple threads |
| 167 | +can contribute to the same merge concurrently. Incoming runs however are /not/ |
| 168 | +shared between tables. As such the tracking of 'NominalCredits' does not need |
| 169 | +to use any concurrency precautions. |
| 170 | +-} |
| 171 | + |
| 172 | +-- | Total merge debt to complete the merge in an incoming run. |
| 173 | +-- |
| 174 | +-- This corresponds to the number (worst case, minimum number) of update |
| 175 | +-- operations inserted into the table, before we will expect the merge to |
| 176 | +-- complete. |
| 177 | +newtype NominalDebt = NominalDebt Int |
| 178 | + deriving stock Eq |
| 179 | + deriving newtype (NFData) |
| 180 | + |
| 181 | +-- | Merge credits that get supplied to a table's levels. |
| 182 | +-- |
| 183 | +-- This corresponds to the number of update operations inserted into the table. |
| 184 | +newtype NominalCredits = NominalCredits Int |
| 185 | + deriving stock Eq |
| 186 | + deriving newtype (Prim, NFData) |
| 187 | + |
| 188 | +nominalDebtAsCredits :: NominalDebt -> NominalCredits |
| 189 | +nominalDebtAsCredits (NominalDebt c) = NominalCredits c |
| 190 | + |
| 191 | +{-# SPECIALISE supplyCreditsIncomingRun :: |
| 192 | + TableConfig |
| 193 | + -> LevelNo |
| 194 | + -> IncomingRun IO h |
| 195 | + -> NominalCredits |
| 196 | + -> IO () #-} |
| 197 | +-- | Supply a given number of nominal credits to the merge in an incoming run. |
| 198 | +-- This is a relative addition of credits, not a new absolute total value. |
| 199 | +supplyCreditsIncomingRun :: |
| 200 | + (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) |
| 201 | + => TableConfig |
| 202 | + -> LevelNo |
| 203 | + -> IncomingRun m h |
| 204 | + -> NominalCredits |
| 205 | + -> m () |
| 206 | +supplyCreditsIncomingRun _ _ (Single _r) _ = return () |
| 207 | +supplyCreditsIncomingRun conf ln (Merging _ nominalDebt nominalCreditsVar mr) |
| 208 | + deposit = do |
| 209 | + (_nominalCredits, |
| 210 | + nominalCredits') <- depositNominalCredits nominalDebt nominalCreditsVar |
| 211 | + deposit |
| 212 | + let !mergeDebt = MR.totalMergeDebt mr |
| 213 | + !mergeCredits' = scaleNominalToMergeCredit nominalDebt mergeDebt |
| 214 | + nominalCredits' |
| 215 | + !thresh = creditThresholdForLevel conf ln |
| 216 | + (_suppliedCredits, |
| 217 | + _suppliedCredits') <- MR.supplyCreditsAbsolute mr thresh mergeCredits' |
| 218 | + return () |
| 219 | + --TODO: currently each supplying credits action results in contributing |
| 220 | + -- credits to the underlying merge, but this need not be the case. We |
| 221 | + -- _could_ do threshold based batching at the level of the IncomingRun. |
| 222 | + -- The IncomingRun does not need to worry about concurrency, so does not |
| 223 | + -- pay the cost of atomic operations on the counters. Then when we |
| 224 | + -- accumulate a batch we could supply that to the MergingRun (which must |
| 225 | + -- use atomic operations for its counters). We could potentially simplify |
| 226 | + -- MergingRun by dispensing with batching for the MergeCredits counters. |
| 227 | + |
| 228 | +-- TODO: the thresholds for doing merge work should be different for each level, |
| 229 | +-- maybe co-prime? |
| 230 | +creditThresholdForLevel :: TableConfig -> LevelNo -> MR.CreditThreshold |
| 231 | +creditThresholdForLevel conf (LevelNo _i) = |
| 232 | + let AllocNumEntries (NumEntries x) = confWriteBufferAlloc conf |
| 233 | + in MR.CreditThreshold (MR.UnspentCredits (MergeCredits x)) |
| 234 | + |
| 235 | +-- | Deposit nominal credits in the local credits var, ensuring the total |
| 236 | +-- credits does not exceed the total debt. |
| 237 | +-- |
| 238 | +-- Depositing /could/ leave the credit higher than the total debt. It is not |
| 239 | +-- avoided by construction. The scenario is this: when a completed merge is |
| 240 | +-- underfull, we combine it with the incoming run, so it means we have one run |
| 241 | +-- fewer on the level then we'd normally have. This means that the level |
| 242 | +-- becomes full at a later time, so more time passes before we call |
| 243 | +-- 'MR.expectCompleted' on any levels further down the tree. This means we keep |
| 244 | +-- supplying nominal credits to levels further down past the point their |
| 245 | +-- nominal debt is paid off. So the solution here is just to drop any nominal |
| 246 | +-- credits that are in excess of the nominal debt. |
| 247 | +-- |
| 248 | +-- This is /not/ itself thread safe. All 'TableContent' update operations are |
| 249 | +-- expected to be serialised by the caller. See concurrency comments for |
| 250 | +-- 'TableContent' for detail. |
| 251 | +depositNominalCredits :: |
| 252 | + PrimMonad m |
| 253 | + => NominalDebt |
| 254 | + -> PrimVar (PrimState m) NominalCredits |
| 255 | + -> NominalCredits |
| 256 | + -> m (NominalCredits, NominalCredits) |
| 257 | +depositNominalCredits (NominalDebt nominalDebt) |
| 258 | + nominalCreditsVar |
| 259 | + (NominalCredits deposit) = do |
| 260 | + NominalCredits before <- readPrimVar nominalCreditsVar |
| 261 | + let !after = NominalCredits (min (before + deposit) nominalDebt) |
| 262 | + writePrimVar nominalCreditsVar after |
| 263 | + return (NominalCredits before, after) |
| 264 | + |
| 265 | +-- | Linearly scale a nominal credit (between 0 and the nominal debt) into an |
| 266 | +-- equivalent merge credit (between 0 and the total merge debt). |
| 267 | +-- |
| 268 | +-- Crucially, @100% nominal credit ~~ 100% merge credit@, so when we pay off |
| 269 | +-- the nominal debt, we also exactly pay off the merge debt. That is: |
| 270 | +-- |
| 271 | +-- > scaleNominalToMergeCredit nominalDebt mergeDebt nominalDebt == mergeDebt |
| 272 | +-- |
| 273 | +-- (modulo some newtype conversions) |
| 274 | +-- |
| 275 | +scaleNominalToMergeCredit :: |
| 276 | + NominalDebt |
| 277 | + -> MergeDebt |
| 278 | + -> NominalCredits |
| 279 | + -> MergeCredits |
| 280 | +scaleNominalToMergeCredit (NominalDebt nominalDebt) |
| 281 | + (MergeDebt (MergeCredits mergeDebt)) |
| 282 | + (NominalCredits nominalCredits) = |
| 283 | + -- The scaling involves an operation: (a * b) `div` c |
| 284 | + -- but where potentially the variables a,b,c may be bigger than a 32bit |
| 285 | + -- integer can hold. This would be the case for runs that have more than |
| 286 | + -- 4 billion entries. |
| 287 | + -- |
| 288 | + -- (This is assuming 64bit Int, the problem would be even worse for 32bit |
| 289 | + -- systems. The solution here would also work for 32bit systems, allowing |
| 290 | + -- up to, 2^31, 2 billion entries per run.) |
| 291 | + -- |
| 292 | + -- To work correctly in this case we need higher range for the intermediate |
| 293 | + -- result a*b which could be bigger than 64bits can hold. A correct |
| 294 | + -- implementation can use Rational, but a fast implementation should use |
| 295 | + -- only integer operations. This is relevant because this is on the fast |
| 296 | + -- path for small insertions into the table that often do no merging work |
| 297 | + -- and just update credit counters. |
| 298 | + |
| 299 | + -- The fast implementation uses integer operations that produce a 128bit |
| 300 | + -- intermediate result for the a*b result, and use a 128bit numerator in |
| 301 | + -- the division operation (but 64bit denominator). These are known as |
| 302 | + -- "widening multiplication" and "narrowing division". GHC has direct |
| 303 | + -- support for these operations as primops: timesWord2# and quotRemWord2#, |
| 304 | + -- but they are not exposed through any high level API shipped with GHC. |
| 305 | + |
| 306 | + -- The specification using Rational is: |
| 307 | + let mergeCredits_spec = floor $ toRational nominalCredits |
| 308 | + * toRational mergeDebt |
| 309 | + / toRational nominalDebt |
| 310 | + -- Note that it doesn't matter if we use floor or ceiling here. |
| 311 | + -- Rounding errors will not compound because we sum nominal debt and |
| 312 | + -- convert absolute nominal to absolute merging credit. We don't |
| 313 | + -- convert each deposit and sum all the rounding errors. |
| 314 | + -- When nominalCredits == nominalDebt then the result is exact anyway |
| 315 | + -- (being mergeDebt) so the rounding mode makes no difference when we |
| 316 | + -- get to the end of the merge. Using floor makes things simpler for |
| 317 | + -- the fast integer implementation below, so we take that as the spec. |
| 318 | + |
| 319 | + -- If the nominalCredits is between 0 and nominalDebt then it's |
| 320 | + -- guaranteed that the mergeCredit is between 0 and mergeDebt. |
| 321 | + -- The mergeDebt fits in an Int, therefore the result does too. |
| 322 | + -- Therefore the undefined behaviour case of timesDivABC_fast is |
| 323 | + -- avoided and the w2i cannot overflow. |
| 324 | + mergeCredits_fast = w2i $ timesDivABC_fast (i2w nominalCredits) |
| 325 | + (i2w mergeDebt) |
| 326 | + (i2w nominalDebt) |
| 327 | + in assert (0 < nominalDebt) $ |
| 328 | + assert (0 <= nominalCredits && nominalCredits <= nominalDebt) $ |
| 329 | + assert (mergeCredits_spec == mergeCredits_fast) $ |
| 330 | + MergeCredits mergeCredits_fast |
| 331 | + where |
| 332 | + {-# INLINE i2w #-} |
| 333 | + {-# INLINE w2i #-} |
| 334 | + i2w :: Int -> Word |
| 335 | + w2i :: Word -> Int |
| 336 | + i2w = fromIntegral |
| 337 | + w2i = fromIntegral |
| 338 | + |
| 339 | +-- | Compute @(a * b) `div` c@ for unsigned integers for the full range of |
| 340 | +-- 64bit unsigned integers, provided that @a <= c@ and thus the result will |
| 341 | +-- fit in 64bits. |
| 342 | +-- |
| 343 | +-- The @a * b@ intermediate result is computed using 128bit precision. |
| 344 | +-- |
| 345 | +-- Note: the behaviour is undefined if the result will not fit in 64bits. |
| 346 | +-- It will probably result in immediate termination with SIGFPE. |
| 347 | +-- |
| 348 | +timesDivABC_fast :: Word -> Word -> Word -> Word |
| 349 | +timesDivABC_fast (W# a) (W# b) (W# c) = |
| 350 | + case timesWord2# a b of |
| 351 | + (# ph, pl #) -> |
| 352 | + case quotRemWord2# ph pl c of |
| 353 | + (# q, _r #) -> W# q |
| 354 | + |
| 355 | +{-# SPECIALISE immediatelyCompleteIncomingRun :: |
| 356 | + TableConfig |
| 357 | + -> LevelNo |
| 358 | + -> IncomingRun IO h |
| 359 | + -> IO (Ref (Run IO h)) #-} |
| 360 | +-- | Supply enough credits to complete the merge now. |
| 361 | +immediatelyCompleteIncomingRun :: |
| 362 | + (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) |
| 363 | + => TableConfig |
| 364 | + -> LevelNo |
| 365 | + -> IncomingRun m h |
| 366 | + -> m (Ref (Run m h)) |
| 367 | +immediatelyCompleteIncomingRun conf ln ir = |
| 368 | + case ir of |
| 369 | + Single r -> dupRef r |
| 370 | + Merging _ (NominalDebt nominalDebt) nominalCreditsVar mr -> do |
| 371 | + |
| 372 | + NominalCredits nominalCredits <- readPrimVar nominalCreditsVar |
| 373 | + let !deposit = NominalCredits (nominalDebt - nominalCredits) |
| 374 | + supplyCreditsIncomingRun conf ln ir deposit |
| 375 | + |
| 376 | + -- This ensures the merge is really completed. However, we don't |
| 377 | + -- release the merge yet, but we do return a new reference to the run. |
| 378 | + MR.expectCompleted mr |
0 commit comments