Skip to content

Commit 318e9a5

Browse files
facundominguezmrBlissamesgen
authored andcommitted
ChainDB: let the BlockFetch client add blocks asynchronously
Port of IntersectMBO/ouroboros-network#2721 Co-authored-by: Thomas Winant <[email protected]> Co-authored-by: Alexander Esgen <[email protected]>
1 parent 4e91897 commit 318e9a5

File tree

7 files changed

+114
-38
lines changed

7 files changed

+114
-38
lines changed

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ library
292292
io-classes ^>=1.5,
293293
measures,
294294
mtl,
295+
multiset ^>=0.3,
295296
nothunks ^>=0.2,
296297
ouroboros-network-api ^>=0.11,
297298
ouroboros-network-mock ^>=0.1,

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
3131
(LedgerSupportsProtocol)
3232
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
3333
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as CSJumping
34-
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
34+
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise,
35+
ChainDB)
3536
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
3637
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
3738
(InvalidBlockPunishment)
@@ -57,16 +58,16 @@ data ChainDbView m blk = ChainDbView {
5758
getCurrentChain :: STM m (AnchoredFragment (Header blk))
5859
, getIsFetched :: STM m (Point blk -> Bool)
5960
, getMaxSlotNo :: STM m MaxSlotNo
60-
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
61+
, addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk)
6162
, getChainSelStarvation :: STM m ChainSelStarvation
6263
}
6364

64-
defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
65+
defaultChainDbView :: ChainDB m blk -> ChainDbView m blk
6566
defaultChainDbView chainDB = ChainDbView {
6667
getCurrentChain = ChainDB.getCurrentChain chainDB
6768
, getIsFetched = ChainDB.getIsFetched chainDB
6869
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
69-
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
70+
, addBlockAsync = ChainDB.addBlockAsync chainDB
7071
, getChainSelStarvation = ChainDB.getChainSelStarvation chainDB
7172
}
7273

@@ -217,8 +218,8 @@ mkBlockFetchConsensusInterface
217218
pipeliningPunishment <- InvalidBlockPunishment.mkForDiffusionPipelining
218219
pure $ mkAddFetchedBlock_ pipeliningPunishment pipelining
219220

220-
-- Waits until the block has been written to disk, but not until chain
221-
-- selection has processed the block.
221+
-- Hand over the block to the ChainDB, but don't wait until it has been
222+
-- written to disk or processed.
222223
mkAddFetchedBlock_ ::
223224
( BlockConfig blk
224225
-> Header blk
@@ -262,7 +263,7 @@ mkBlockFetchConsensusInterface
262263
DiffusionPipeliningOff -> disconnect
263264
DiffusionPipeliningOn ->
264265
pipeliningPunishment bcfg (getHeader blk) disconnect
265-
addBlockWaitWrittenToDisk
266+
addBlockAsync
266267
chainDB
267268
punishment
268269
blk

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,9 @@ data ChainDB m blk = ChainDB {
212212
, getBlockComponent :: forall b. BlockComponent blk b
213213
-> RealPoint blk -> m (Maybe b)
214214

215-
-- | Return membership check function for recent blocks
215+
-- | Return membership check function for recent blocks. This includes
216+
-- blocks in the VolatileDB and blocks that are currently being processed
217+
-- or are waiting in a queue to be processed.
216218
--
217219
-- This check is only reliable for blocks up to @k@ away from the tip.
218220
-- For blocks older than that the results should be regarded as
@@ -238,7 +240,8 @@ data ChainDB m blk = ChainDB {
238240
-- are part of a shorter fork.
239241
, getIsValid :: STM m (RealPoint blk -> Maybe Bool)
240242

241-
-- | Get the highest slot number stored in the ChainDB.
243+
-- | Get the highest slot number stored in the ChainDB (this includes
244+
-- blocks that are waiting in the background queue to be processed).
242245
--
243246
-- Note that the corresponding block doesn't have to be part of the
244247
-- current chain, it could be part of some fork, or even be a

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
541541
ChainSelAddBlock BlockToAdd{blockToAdd} ->
542542
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
543543
blockRealPoint blockToAdd
544-
chainSelSync cdb message)
544+
chainSelSync cdb message
545+
lift $ atomically $ processedChainSelMessage cdbChainSelQueue message)
545546
where
546547
starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,15 @@ getBlockComponent ::
164164
getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB
165165

166166
getIsFetched ::
167-
forall m blk. IOLike m
167+
forall m blk. (IOLike m, HasHeader blk)
168168
=> ChainDbEnv m blk -> STM m (Point blk -> Bool)
169-
getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB
170-
where
171-
-- The volatile DB indexes by hash only, not by points. However, it should
172-
-- not be possible to have two points with the same hash but different
173-
-- slot numbers.
174-
basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool
175-
basedOnHash f p =
176-
case pointHash p of
177-
BlockHash hash -> f hash
178-
GenesisHash -> False
169+
getIsFetched CDB{..} = do
170+
checkQueue <- memberChainSelQueue cdbChainSelQueue
171+
checkVolDb <- VolatileDB.getIsMember cdbVolatileDB
172+
return $ \pt ->
173+
case pointToWithOriginRealPoint pt of
174+
Origin -> False
175+
NotOrigin pt' -> checkQueue pt' || checkVolDb (realPointHash pt')
179176

180177
getIsInvalidBlock ::
181178
forall m blk. (IOLike m, HasHeader blk)
@@ -218,10 +215,13 @@ getMaxSlotNo CDB{..} = do
218215
-- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot
219216
-- of the current chain will be 10 (being the anchor point of the empty
220217
-- current chain), while the max slot of the VolatileDB will be 9.
218+
--
219+
-- Moreover, we have to look in 'ChainSelQueue' too.
221220
curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot
222221
<$> readTVar cdbChain
223-
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
224-
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo
222+
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
223+
queuedMaxSlotNo <- getMaxSlotNoChainSelQueue cdbChainSelQueue
224+
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` queuedMaxSlotNo
225225

226226
{-------------------------------------------------------------------------------
227227
Unifying interface over the immutable DB and volatile DB, but independent

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE BangPatterns #-}
2+
{-# LANGUAGE BlockArguments #-}
23
{-# LANGUAGE DataKinds #-}
34
{-# LANGUAGE DeriveAnyClass #-}
45
{-# LANGUAGE DeriveGeneric #-}
@@ -42,12 +43,15 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
4243
-- * Blocks to add
4344
, BlockToAdd (..)
4445
, ChainSelMessage (..)
45-
, ChainSelQueue
46+
, ChainSelQueue -- opaque
4647
, addBlockToAdd
4748
, addReprocessLoEBlocks
4849
, closeChainSelQueue
4950
, getChainSelMessage
51+
, getMaxSlotNoChainSelQueue
52+
, memberChainSelQueue
5053
, newChainSelQueue
54+
, processedChainSelMessage
5155
-- * Trace types
5256
, SelectionChangedInfo (..)
5357
, TraceAddBlockEvent (..)
@@ -63,14 +67,15 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
6367
, TraceValidationEvent (..)
6468
) where
6569

66-
import Cardano.Prelude (whenM)
6770
import Control.Monad (when)
6871
import Control.ResourceRegistry
6972
import Control.Tracer
7073
import Data.Foldable (traverse_)
7174
import Data.Map.Strict (Map)
7275
import Data.Maybe (mapMaybe)
7376
import Data.Maybe.Strict (StrictMaybe (..))
77+
import Data.MultiSet (MultiSet)
78+
import qualified Data.MultiSet as MultiSet
7479
import Data.Set (Set)
7580
import Data.Typeable
7681
import Data.Void (Void)
@@ -107,7 +112,7 @@ import Ouroboros.Consensus.Util.Enclose (Enclosing, Enclosing' (..))
107112
import Ouroboros.Consensus.Util.IOLike
108113
import Ouroboros.Consensus.Util.STM (WithFingerprint)
109114
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
110-
import Ouroboros.Network.Block (MaxSlotNo)
115+
import Ouroboros.Network.Block (MaxSlotNo (..))
111116
import Ouroboros.Network.BlockFetch.ConsensusInterface
112117
(ChainSelStarvation (..))
113118

@@ -419,7 +424,19 @@ data InvalidBlockInfo blk = InvalidBlockInfo
419424
-- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are
420425
-- read from this queue by a background thread, which processes the blocks
421426
-- synchronously.
422-
newtype ChainSelQueue m blk = ChainSelQueue (TBQueue m (ChainSelMessage m blk))
427+
--
428+
-- We also maintain a multiset of the points of all of the blocks in the queue,
429+
-- plus potentially the one block for which chain selection is currently in
430+
-- progress. It is used to account for queued blocks in eg 'getIsFetched' and
431+
-- 'getMaxSlotNo'.
432+
--
433+
-- INVARIANT: Counted with multiplicity, @varChainSelPoints@ contains exactly
434+
-- the same hashes or at most one additional hash compared to the hashes of
435+
-- blocks in @varChainSelQueue@.
436+
data ChainSelQueue m blk = ChainSelQueue {
437+
varChainSelQueue :: TBQueue m (ChainSelMessage m blk)
438+
, varChainSelPoints :: StrictTVar m (MultiSet (RealPoint blk))
439+
}
423440
deriving NoThunks via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk)
424441

425442
-- | Entry in the 'ChainSelQueue' queue: a block together with the 'TMVar's used
@@ -445,9 +462,14 @@ data ChainSelMessage m blk
445462
-- ^ Used for 'ChainSelectionPromise'.
446463

447464
-- | Create a new 'ChainSelQueue' with the given size.
448-
newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk)
449-
newChainSelQueue queueSize = ChainSelQueue <$>
450-
atomically (newTBQueue (fromIntegral queueSize))
465+
newChainSelQueue :: (IOLike m, StandardHash blk, Typeable blk) => Word -> m (ChainSelQueue m blk)
466+
newChainSelQueue chainSelQueueCapacity = do
467+
varChainSelQueue <- newTBQueueIO (fromIntegral chainSelQueueCapacity)
468+
varChainSelPoints <- newTVarIO MultiSet.empty
469+
pure ChainSelQueue {
470+
varChainSelQueue
471+
, varChainSelPoints
472+
}
451473

452474
-- | Add a block to the 'ChainSelQueue' queue. Can block when the queue is full.
453475
addBlockToAdd ::
@@ -457,7 +479,7 @@ addBlockToAdd ::
457479
-> InvalidBlockPunishment m
458480
-> blk
459481
-> m (AddBlockPromise m blk)
460-
addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
482+
addBlockToAdd tracer (ChainSelQueue {varChainSelQueue, varChainSelPoints}) punish blk = do
461483
varBlockWrittenToDisk <- newEmptyTMVarIO
462484
varBlockProcessed <- newEmptyTMVarIO
463485
let !toAdd = BlockToAdd
@@ -466,10 +488,12 @@ addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
466488
, varBlockWrittenToDisk
467489
, varBlockProcessed
468490
}
469-
traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) RisingEdge
491+
pt = blockRealPoint blk
492+
traceWith tracer $ AddedBlockToQueue pt RisingEdge
470493
queueSize <- atomically $ do
471-
writeTBQueue queue (ChainSelAddBlock toAdd)
472-
lengthTBQueue queue
494+
writeTBQueue varChainSelQueue (ChainSelAddBlock toAdd)
495+
modifyTVar varChainSelPoints $ MultiSet.insert pt
496+
lengthTBQueue varChainSelQueue
473497
traceWith tracer $
474498
AddedBlockToQueue (blockRealPoint blk) (FallingEdgeWith (fromIntegral queueSize))
475499
return AddBlockPromise
@@ -483,11 +507,12 @@ addReprocessLoEBlocks
483507
=> Tracer m (TraceAddBlockEvent blk)
484508
-> ChainSelQueue m blk
485509
-> m (ChainSelectionPromise m)
486-
addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
510+
addReprocessLoEBlocks tracer ChainSelQueue {varChainSelQueue} = do
487511
varProcessed <- newEmptyTMVarIO
488512
let waitUntilRan = atomically $ readTMVar varProcessed
489513
traceWith tracer $ AddedReprocessLoEBlocksToQueue
490-
atomically $ writeTBQueue queue $ ChainSelReprocessLoEBlocks varProcessed
514+
atomically $ writeTBQueue varChainSelQueue $
515+
ChainSelReprocessLoEBlocks varProcessed
491516
return $ ChainSelectionPromise waitUntilRan
492517

493518
-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
@@ -499,7 +524,7 @@ getChainSelMessage
499524
-> StrictTVar m ChainSelStarvation
500525
-> ChainSelQueue m blk
501526
-> m (ChainSelMessage m blk)
502-
getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) =
527+
getChainSelMessage starvationTracer starvationVar chainSelQueue =
503528
atomically (tryReadTBQueue' queue) >>= \case
504529
Just msg -> pure msg
505530
Nothing -> do
@@ -508,6 +533,10 @@ getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) =
508533
terminateStarvationMeasure msg
509534
pure msg
510535
where
536+
ChainSelQueue {
537+
varChainSelQueue = queue
538+
} = chainSelQueue
539+
511540
startStarvationMeasure :: m ()
512541
startStarvationMeasure = do
513542
prevStarvation <- atomically $ swapTVar starvationVar ChainSelStarvationOngoing
@@ -531,7 +560,7 @@ tryReadTBQueue' q = (Just <$> readTBQueue q) `orElse` pure Nothing
531560
-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
532561
--
533562
closeChainSelQueue :: IOLike m => ChainSelQueue m blk -> STM m ()
534-
closeChainSelQueue (ChainSelQueue queue) = do
563+
closeChainSelQueue ChainSelQueue{varChainSelQueue = queue} = do
535564
as <- mapMaybe blockAdd <$> flushTBQueue queue
536565
traverse_ (\a -> tryPutTMVar (varBlockProcessed a)
537566
(FailedToAddBlock "Queue flushed"))
@@ -541,6 +570,41 @@ closeChainSelQueue (ChainSelQueue queue) = do
541570
ChainSelAddBlock ab -> Just ab
542571
ChainSelReprocessLoEBlocks _ -> Nothing
543572

573+
-- | To invoke when the given 'ChainSelMessage' has been processed by ChainSel.
574+
-- This is used to remove the respective point from the multiset of points in
575+
-- the 'ChainSelQueue' (as the block has now been written to disk by ChainSel).
576+
processedChainSelMessage ::
577+
(IOLike m, HasHeader blk)
578+
=> ChainSelQueue m blk
579+
-> ChainSelMessage m blk
580+
-> STM m ()
581+
processedChainSelMessage ChainSelQueue {varChainSelPoints} = \case
582+
ChainSelAddBlock BlockToAdd{blockToAdd = blk} ->
583+
modifyTVar varChainSelPoints $ MultiSet.delete (blockRealPoint blk)
584+
ChainSelReprocessLoEBlocks{} ->
585+
pure ()
586+
587+
-- | Return a function to test the membership
588+
memberChainSelQueue ::
589+
(IOLike m, HasHeader blk)
590+
=> ChainSelQueue m blk
591+
-> STM m (RealPoint blk -> Bool)
592+
memberChainSelQueue ChainSelQueue {varChainSelPoints} =
593+
flip MultiSet.member <$> readTVar varChainSelPoints
594+
595+
getMaxSlotNoChainSelQueue ::
596+
IOLike m
597+
=> ChainSelQueue m blk
598+
-> STM m MaxSlotNo
599+
getMaxSlotNoChainSelQueue ChainSelQueue {varChainSelPoints} =
600+
aux <$> readTVar varChainSelPoints
601+
where
602+
-- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the
603+
-- maximal key of the map has the greatest 'SlotNo'.
604+
aux :: MultiSet (RealPoint blk) -> MaxSlotNo
605+
aux pts = case MultiSet.maxView pts of
606+
Nothing -> NoMaxSlotNo
607+
Just (RealPoint s _, _) -> MaxSlotNo s
544608

545609
{-------------------------------------------------------------------------------
546610
Trace types

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import Data.Bimap (Bimap)
2323
import qualified Data.Bimap as Bimap
2424
import Data.IntPSQ (IntPSQ)
2525
import qualified Data.IntPSQ as PSQ
26+
import Data.MultiSet (MultiSet)
27+
import qualified Data.MultiSet as MultiSet
2628
import Data.SOP.BasicFunctors
2729
import NoThunks.Class (InspectHeap (..), InspectHeapNamed (..),
2830
NoThunks (..), OnlyCheckWhnfNamed (..), allNoThunks,
@@ -75,6 +77,10 @@ instance NoThunks a => NoThunks (K a b) where
7577
showTypeOf _ = showTypeOf (Proxy @a)
7678
wNoThunks ctxt (K a) = wNoThunks ("K":ctxt) a
7779

80+
instance NoThunks a => NoThunks (MultiSet a) where
81+
showTypeOf _ = "MultiSet"
82+
wNoThunks ctxt = wNoThunks ctxt . MultiSet.toMap
83+
7884
{-------------------------------------------------------------------------------
7985
fs-api
8086
-------------------------------------------------------------------------------}

0 commit comments

Comments
 (0)