@@ -231,6 +231,14 @@ duplicateRuns (DeRef mr) =
231231 V. mapM (\ r -> withRollback reg (dupRef r) releaseRef) rs
232232
233233-- | Take a snapshot of the state of a merging run.
234+ --
235+ -- TODO: this is not concurrency safe! The inputs runs to the merging run could
236+ -- be released concurrently by another thread that completes the merge, while
237+ -- the snapshot is taking place. The solution is for snapshot here to duplicate
238+ -- the runs it returns _while_ holding the mergeState MVar (to exclude threads
239+ -- that might concurrently complete the merge). And then the caller of course
240+ -- must be updated to release the extra references.
241+ --
234242snapshot ::
235243 (PrimMonad m , MonadMVar m )
236244 => Ref (MergingRun m h )
@@ -267,7 +275,7 @@ work to do).
267275The implementation is similar but somewhat more complex. We also accumulate
268276unspent credits until they reach a threshold at which point we do a batch of
269277merging work. Unlike the prototype, the implementation tracks both credits
270- spent credits as yet unspent. We will elaborate on why and how below.
278+ spent and credits as yet unspent. We will elaborate on why and how below.
271279
272280In the prototype, the credits spent equals the merge steps performed. The
273281same holds in the real implementation, but making it so is more complicated.
@@ -296,7 +304,8 @@ Thus we track two things:
296304 * credits unspent ('UnspentCredits'): credits supplied that are not yet spent
297305 and are thus available to spend.
298306
299- The credits supplied is the sum of the credits spent and unspent.
307+ The credits supplied is the sum of the credits spent and unspent. We guarantee
308+ that the supplied credits never exceeds the total debt.
300309
301310The credits spent and the steps performed (or in the process of being
302311performed) will typically be equal. They are not guaranteed to be equal in the
@@ -330,7 +339,7 @@ numEntriesToTotalDebt (NumEntries n) = Credits n
330339-- Note that ideally the batch size for different LSM levels should be
331340-- co-prime so that merge work at different levels is not synchronised.
332341--
333- newtype CreditThreshold = CreditThreshold Credits
342+ newtype CreditThreshold = CreditThreshold UnspentCredits
334343
335344-- | The supplied credits is simply the sum of all the credits that have been
336345-- (successfully) supplied to a merging run via 'supplyCredits'.
@@ -559,8 +568,8 @@ atomicDepositAndSpendCredits (CreditsVar !var) !totalDebt
559568
560569 -- 2. not case 1, but enough unspent credits have accumulated to do
561570 -- a batch of merge work;
562- | ( \ ( UnspentCredits x) -> x) unspent' >= batchThreshold
563- = spendBatchCredits spent unspent'
571+ | unspent' >= batchThreshold
572+ = spendBatchCredits spent unspent' batchThreshold
564573
565574 -- 3. not case 1 or 2, not enough credits to do any merge work.
566575 | otherwise
@@ -587,14 +596,15 @@ atomicDepositAndSpendCredits (CreditsVar !var) !totalDebt
587596 assert (leftover >= 0 ) $
588597 (supplied', UnspentCredits unspent', leftover)
589598
590- spendBatchCredits (SpentCredits ! spent) (UnspentCredits ! unspent) =
599+ spendBatchCredits (SpentCredits ! spent) (UnspentCredits ! unspent)
600+ (UnspentCredits unspentBatchThreshold) =
591601 -- numBatches may be zero, in which case the result will be zero
592- let ! nBatches = unspent `div` batchThreshold
593- ! spend = nBatches * batchThreshold
602+ let ! nBatches = unspent `div` unspentBatchThreshold
603+ ! spend = nBatches * unspentBatchThreshold
594604 ! spent' = spent + spend
595605 ! unspent' = unspent - spend
596606 in assert (spend >= 0 ) $
597- assert (unspent' < batchThreshold ) $
607+ assert (unspent' < unspentBatchThreshold ) $
598608 assert (spent' + unspent' == spent + unspent) $
599609 (spend, SpentCredits spent', UnspentCredits unspent')
600610
@@ -702,11 +712,10 @@ performMergeSteps ::
702712 -> Credits
703713 -> m Bool
704714performMergeSteps mergeVar creditsVar (Credits credits) =
715+ assert (credits >= 0 ) $
705716 withMVar mergeVar $ \ case
706717 CompletedMerge {} -> pure False
707718 OngoingMerge _rs m -> do
708- -- We have dealt with the case of credits <= 0 above,
709- -- so here we know credits is positive
710719 let stepsToDo = credits
711720 (stepsDone, stepResult) <- Merge. steps m stepsToDo
712721 assert (stepResult == MergeDone || stepsDone >= stepsToDo) (pure () )
@@ -743,8 +752,9 @@ completeMerge mergeVar mergeKnownCompletedVar = do
743752 (OngoingMerge rs m) -> do
744753 -- first try to complete the merge before performing other side effects,
745754 -- in case the completion fails
746- -- TODO: Run.fromMutable claims not to be exception safe
747- -- may need to use uninteruptible mask
755+ -- TODO: Run.fromMutable (used in Merge.complete) claims not to be
756+ -- exception safe so we should probably be using the resource registry
757+ -- and test for exception safety.
748758 r <- Merge. complete m
749759 V. forM_ rs releaseRef
750760 -- Cache the knowledge that we completed the merge
@@ -768,16 +778,14 @@ expectCompleted (DeRef MergingRun {..}) = do
768778 let totalDebt = numEntriesToTotalDebt mergeNumEntries
769779 suppliedCredits = spentCredits + unspentCredits
770780 ! credits = assert (suppliedCredits == totalDebt) $
781+ assert (unspentCredits >= 0 ) $
771782 unspentCredits
772783
773- -- TODO: what about exception safety: check if it is ok to be interrupted
774- -- between performMergeSteps and completeMerge here, and above.
775784 weFinishedMerge <- performMergeSteps mergeState mergeCreditsVar credits
785+ -- If an async exception happens before we get to perform the
786+ -- completion, then that is fine. The next 'expectCompleted' will
787+ -- complete the merge.
776788 when weFinishedMerge $ completeMerge mergeState mergeKnownCompleted
777- -- TODO: can we think of a check to see if we did not do too much work
778- -- here? <-- assert (suppliedCredits == totalDebt) ought to do it!
779- -- A related question is if we finished the merge too early, could have
780- -- spread out the work better.
781789 withMVar mergeState $ \ case
782790 CompletedMerge r -> dupRef r -- return a fresh reference to the run
783791 OngoingMerge {} -> do
0 commit comments