Skip to content

Commit 7d02cc5

Browse files
committed
ChainDB: implement chain selection for certificates
1 parent bc2ec2c commit 7d02cc5

File tree

6 files changed

+170
-17
lines changed

6 files changed

+170
-17
lines changed

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ module Ouroboros.Consensus.Storage.ChainDB.API
2525
, addBlockWaitWrittenToDisk
2626
, addBlock_
2727

28+
-- * Adding a Peras certificate
29+
, AddPerasCertPromise (..)
30+
, addPerasCertSync
31+
2832
-- * Trigger chain selection
2933
, ChainSelectionPromise (..)
3034
, triggerChainSelection
@@ -387,7 +391,7 @@ data ChainDB m blk = ChainDB
387391
, getStatistics :: m (Maybe Statistics)
388392
-- ^ Get statistics from the LedgerDB, in particular the number of entries
389393
-- in the tables.
390-
, addPerasCert :: PerasCert blk -> m ()
394+
, addPerasCertAsync :: PerasCert blk -> m (AddPerasCertPromise m)
391395
-- ^ TODO
392396
, getPerasWeightSnapshot :: STM m (WithFingerprint (PerasWeightSnapshot blk))
393397
-- ^ TODO
@@ -510,6 +514,23 @@ triggerChainSelection :: IOLike m => ChainDB m blk -> m ()
510514
triggerChainSelection chainDB =
511515
waitChainSelectionPromise =<< chainSelAsync chainDB
512516

517+
{-------------------------------------------------------------------------------
518+
Adding a Peras certificate
519+
-------------------------------------------------------------------------------}
520+
521+
newtype AddPerasCertPromise m = AddPerasCertPromise
522+
{ waitPerasCertProcessed :: m ()
523+
-- ^ Wait until the Peras certificate has been processed (which potentially
524+
-- includes switching to a different chain). If the PerasCertDB did already
525+
-- contain a certificate for this round, the certificate is ignored (as the
526+
-- two certificates must be identical because certificate equivocation is
527+
-- impossible).
528+
}
529+
530+
addPerasCertSync :: IOLike m => ChainDB m blk -> PerasCert blk -> m ()
531+
addPerasCertSync chainDB cert =
532+
waitPerasCertProcessed =<< addPerasCertAsync chainDB cert
533+
513534
{-------------------------------------------------------------------------------
514535
Serialised block/header with its point
515536
-------------------------------------------------------------------------------}

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl
1616
-- * Trace types
1717
, SelectionChangedInfo (..)
1818
, TraceAddBlockEvent (..)
19+
, TraceAddPerasCertEvent (..)
1920
, TraceChainSelStarvationEvent (..)
2021
, TraceCopyToImmutableDBEvent (..)
2122
, TraceEvent (..)
@@ -278,10 +279,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
278279
, getHeaderStateHistory = getEnvSTM h Query.getHeaderStateHistory
279280
, getReadOnlyForkerAtPoint = getEnv2 h Query.getReadOnlyForkerAtPoint
280281
, getStatistics = getEnv h Query.getStatistics
281-
, addPerasCert = getEnv1 h $ \cdb@CDB{..} cert -> do
282-
_ <- PerasCertDB.addCert cdbPerasCertDB cert
283-
-- TODO trigger chain selection in a more efficient way
284-
waitChainSelectionPromise =<< ChainSel.triggerChainSelectionAsync cdb
282+
, addPerasCertAsync = getEnv1 h ChainSel.addPerasCertAsync
285283
, getPerasWeightSnapshot = getEnvSTM h Query.getPerasWeightSnapshot
286284
}
287285
addBlockTestFuse <- newFuse "test chain selection"

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,8 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
546546
varBlockProcessed
547547
(FailedToAddBlock "Failed to add block synchronously")
548548
pure ()
549+
ChainSelAddPerasCert _cert varProcessed ->
550+
void $ tryPutTMVar varProcessed ()
549551
closeChainSelQueue cdbChainSelQueue
550552
)
551553
( \message -> do
@@ -554,6 +556,10 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
554556
trace PoppedReprocessLoEBlocksFromQueue
555557
ChainSelAddBlock BlockToAdd{blockToAdd} ->
556558
trace $ PoppedBlockFromQueue $ blockRealPoint blockToAdd
559+
ChainSelAddPerasCert cert _varProcessed ->
560+
traceWith cdbTracer $
561+
TraceAddPerasCertEvent $
562+
PoppedPerasCertFromQueue (perasCertRound cert) (perasCertBoostedBlock cert)
557563
chainSelSync cdb message
558564
lift $ atomically $ processedChainSelMessage cdbChainSelQueue message
559565
)

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-- adding a block.
1414
module Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
1515
( addBlockAsync
16+
, addPerasCertAsync
1617
, chainSelSync
1718
, chainSelectionForBlock
1819
, initialChainSelection
@@ -68,6 +69,7 @@ import Ouroboros.Consensus.Peras.Weight
6869
import Ouroboros.Consensus.Storage.ChainDB.API
6970
( AddBlockPromise (..)
7071
, AddBlockResult (..)
72+
, AddPerasCertPromise
7173
, BlockComponent (..)
7274
, ChainType (..)
7375
, LoE (..)
@@ -91,10 +93,12 @@ import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB)
9193
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
9294
import Ouroboros.Consensus.Storage.LedgerDB
9395
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
96+
import qualified Ouroboros.Consensus.Storage.PerasCertDB.API as PerasCertDB
9497
import Ouroboros.Consensus.Storage.VolatileDB (VolatileDB)
9598
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
9699
import Ouroboros.Consensus.Util
97100
import Ouroboros.Consensus.Util.AnchoredFragment
101+
import Ouroboros.Consensus.Util.EarlyExit (exitEarly, withEarlyExit_)
98102
import Ouroboros.Consensus.Util.Enclose (encloseWith)
99103
import Ouroboros.Consensus.Util.IOLike
100104
import Ouroboros.Consensus.Util.STM (WithFingerprint (..))
@@ -323,6 +327,15 @@ addBlockAsync ::
323327
addBlockAsync CDB{cdbTracer, cdbChainSelQueue} =
324328
addBlockToAdd (TraceAddBlockEvent >$< cdbTracer) cdbChainSelQueue
325329

330+
addPerasCertAsync ::
331+
forall m blk.
332+
(IOLike m, HasHeader blk) =>
333+
ChainDbEnv m blk ->
334+
PerasCert blk ->
335+
m (AddPerasCertPromise m)
336+
addPerasCertAsync CDB{cdbTracer, cdbChainSelQueue} =
337+
addPerasCertToQueue (TraceAddPerasCertEvent >$< cdbTracer) cdbChainSelQueue
338+
326339
-- | Schedule reprocessing of blocks postponed by the LoE.
327340
triggerChainSelectionAsync ::
328341
forall m blk.
@@ -461,6 +474,65 @@ chainSelSync cdb@CDB{..} (ChainSelAddBlock BlockToAdd{blockToAdd = b, ..}) = do
461474
deliverProcessed tip =
462475
atomically $
463476
putTMVar varBlockProcessed (SuccesfullyAddedBlock tip)
477+
-- Process a Peras certificate by adding it to the PerasCertDB and potentially
478+
-- performing chain selection if a candidate is now better than our selection.
479+
chainSelSync cdb@CDB{..} (ChainSelAddPerasCert cert varProcessed) = do
480+
curChain <- lift $ atomically $ Query.getCurrentChain cdb
481+
let immTip = castPoint $ AF.anchorPoint curChain
482+
483+
withEarlyExit_ $ do
484+
-- Ignore the certificate if it boosts a block that is so old that it can't
485+
-- influence our selection.
486+
when (pointSlot boostedBlock < pointSlot immTip) $ do
487+
lift $ lift $ traceWith tracer $ IgnorePerasCertTooOld certRound boostedBlock immTip
488+
exitEarly
489+
490+
-- Add the certificate to the PerasCertDB.
491+
lift (lift $ PerasCertDB.addCert cdbPerasCertDB cert) >>= \case
492+
PerasCertDB.AddedPerasCertToDB -> pure ()
493+
-- If it already is in the PerasCertDB, we are done.
494+
PerasCertDB.PerasCertAlreadyInDB -> exitEarly
495+
496+
-- If the certificate boosts a block on our current chain (including the
497+
-- anchor), then it just makes our selection even stronger.
498+
when (AF.withinFragmentBounds (castPoint boostedBlock) curChain) $ do
499+
lift $ lift $ traceWith tracer $ PerasCertBoostsCurrentChain certRound boostedBlock
500+
exitEarly
501+
502+
boostedHash <- case pointHash boostedBlock of
503+
-- If the certificate boosts the Genesis point, then it can not influence
504+
-- chain selection as all chains contain it.
505+
GenesisHash -> do
506+
lift $ lift $ traceWith tracer $ PerasCertBoostsGenesis certRound
507+
exitEarly
508+
-- Otherwise, the certificate boosts a block potentially on a (future)
509+
-- candidate.
510+
BlockHash boostedHash -> pure boostedHash
511+
boostedHdr <-
512+
lift (lift $ VolatileDB.getBlockComponent cdbVolatileDB GetHeader boostedHash) >>= \case
513+
-- If we have not (yet) received the boosted block, we don't need to do
514+
-- anything further for now regarding chain selection. Once we receive
515+
-- it, the additional weight of the certificate is taken into account.
516+
Nothing -> do
517+
lift $ lift $ traceWith tracer $ PerasCertBoostsBlockNotYetReceived certRound boostedBlock
518+
exitEarly
519+
Just boostedHdr -> pure boostedHdr
520+
521+
-- Trigger chain selection for the boosted block.
522+
lift $ lift $ traceWith tracer $ ChainSelectionForBoostedBlock certRound boostedBlock
523+
lift $ chainSelectionForBlock cdb BlockCache.empty boostedHdr noPunishment
524+
525+
-- Deliver promise indicating that we processed the cert.
526+
lift $ atomically $ putTMVar varProcessed ()
527+
where
528+
tracer :: Tracer m (TraceAddPerasCertEvent blk)
529+
tracer = TraceAddPerasCertEvent >$< cdbTracer
530+
531+
certRound :: PerasRoundNo
532+
certRound = perasCertRound cert
533+
534+
boostedBlock :: Point blk
535+
boostedBlock = perasCertBoostedBlock cert
464536

465537
-- | Return 'True' when the given header should be ignored when adding it
466538
-- because it is too old, i.e., we wouldn't be able to switch to a chain

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types
5555
, ChainSelMessage (..)
5656
, ChainSelQueue -- opaque
5757
, addBlockToAdd
58+
, addPerasCertToQueue
5859
, addReprocessLoEBlocks
5960
, closeChainSelQueue
6061
, getChainSelMessage
@@ -66,6 +67,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types
6667
-- * Trace types
6768
, SelectionChangedInfo (..)
6869
, TraceAddBlockEvent (..)
70+
, TraceAddPerasCertEvent (..)
6971
, TraceChainSelStarvationEvent (..)
7072
, TraceCopyToImmutableDBEvent (..)
7173
, TraceEvent (..)
@@ -83,7 +85,6 @@ import Control.ResourceRegistry
8385
import Control.Tracer
8486
import Data.Foldable (traverse_)
8587
import Data.Map.Strict (Map)
86-
import Data.Maybe (mapMaybe)
8788
import Data.Maybe.Strict (StrictMaybe (..))
8889
import Data.MultiSet (MultiSet)
8990
import qualified Data.MultiSet as MultiSet
@@ -104,6 +105,7 @@ import Ouroboros.Consensus.Protocol.Abstract
104105
import Ouroboros.Consensus.Storage.ChainDB.API
105106
( AddBlockPromise (..)
106107
, AddBlockResult (..)
108+
, AddPerasCertPromise (..)
107109
, ChainDbError (..)
108110
, ChainSelectionPromise (..)
109111
, ChainType
@@ -549,6 +551,11 @@ data BlockToAdd m blk = BlockToAdd
549551
data ChainSelMessage m blk
550552
= -- | Add a new block
551553
ChainSelAddBlock !(BlockToAdd m blk)
554+
| -- | Add a Peras certificate
555+
ChainSelAddPerasCert
556+
!(PerasCert blk)
557+
-- | Used for 'AddPerasCertPromise'.
558+
!(StrictTMVar m ())
552559
| -- | Reprocess blocks that have been postponed by the LoE.
553560
ChainSelReprocessLoEBlocks
554561
-- | Used for 'ChainSelectionPromise'.
@@ -597,6 +604,28 @@ addBlockToAdd tracer (ChainSelQueue{varChainSelQueue, varChainSelPoints}) punish
597604
, blockProcessed = readTMVar varBlockProcessed
598605
}
599606

607+
-- | Add a Peras certificate to the background queue.
608+
addPerasCertToQueue ::
609+
(IOLike m, StandardHash blk) =>
610+
Tracer m (TraceAddPerasCertEvent blk) ->
611+
ChainSelQueue m blk ->
612+
PerasCert blk ->
613+
m (AddPerasCertPromise m)
614+
addPerasCertToQueue tracer ChainSelQueue{varChainSelQueue} cert = do
615+
varProcessed <- newEmptyTMVarIO
616+
traceWith tracer $ addedToQueue RisingEdge
617+
queueSize <- atomically $ do
618+
writeTBQueue varChainSelQueue $ ChainSelAddPerasCert cert varProcessed
619+
lengthTBQueue varChainSelQueue
620+
traceWith tracer $ addedToQueue $ FallingEdgeWith $ fromIntegral queueSize
621+
pure
622+
AddPerasCertPromise
623+
{ waitPerasCertProcessed = atomically $ takeTMVar varProcessed
624+
}
625+
where
626+
addedToQueue =
627+
AddedPerasCertToQueue (perasCertRound cert) (perasCertBoostedBlock cert)
628+
600629
-- | Try to add blocks again that were postponed due to the LoE.
601630
addReprocessLoEBlocks ::
602631
IOLike m =>
@@ -651,6 +680,7 @@ getChainSelMessage starvationTracer starvationVar chainSelQueue =
651680
let pt = blockRealPoint block
652681
traceWith starvationTracer $ ChainSelStarvation (FallingEdgeWith pt)
653682
atomically . writeTVar starvationVar . ChainSelStarvationEndedAt =<< getMonotonicTime
683+
ChainSelAddPerasCert{} -> pure ()
654684
ChainSelReprocessLoEBlocks{} -> pure ()
655685

656686
-- TODO Can't use tryReadTBQueue from io-classes because it is broken for IOSim
@@ -661,18 +691,15 @@ tryReadTBQueue' q = (Just <$> readTBQueue q) `orElse` pure Nothing
661691
-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
662692
closeChainSelQueue :: IOLike m => ChainSelQueue m blk -> STM m ()
663693
closeChainSelQueue ChainSelQueue{varChainSelQueue = queue} = do
664-
as <- mapMaybe blockAdd <$> flushTBQueue queue
665-
traverse_
666-
( \a ->
667-
tryPutTMVar
668-
(varBlockProcessed a)
669-
(FailedToAddBlock "Queue flushed")
670-
)
671-
as
694+
traverse_ deliverPromise =<< flushTBQueue queue
672695
where
673-
blockAdd = \case
674-
ChainSelAddBlock ab -> Just ab
675-
ChainSelReprocessLoEBlocks _ -> Nothing
696+
deliverPromise = \case
697+
ChainSelAddBlock ab ->
698+
tryPutTMVar (varBlockProcessed ab) (FailedToAddBlock "Queue flushed")
699+
ChainSelAddPerasCert _cert varProcessed ->
700+
tryPutTMVar varProcessed ()
701+
ChainSelReprocessLoEBlocks varProcessed ->
702+
tryPutTMVar varProcessed ()
676703

677704
-- | To invoke when the given 'ChainSelMessage' has been processed by ChainSel.
678705
-- This is used to remove the respective point from the multiset of points in
@@ -685,6 +712,8 @@ processedChainSelMessage ::
685712
processedChainSelMessage ChainSelQueue{varChainSelPoints} = \case
686713
ChainSelAddBlock BlockToAdd{blockToAdd = blk} ->
687714
modifyTVar varChainSelPoints $ MultiSet.delete (blockRealPoint blk)
715+
ChainSelAddPerasCert{} ->
716+
pure ()
688717
ChainSelReprocessLoEBlocks{} ->
689718
pure ()
690719

@@ -729,6 +758,7 @@ data TraceEvent blk
729758
| TracePerasCertDbEvent (PerasCertDB.TraceEvent blk)
730759
| TraceLastShutdownUnclean
731760
| TraceChainSelStarvationEvent (TraceChainSelStarvationEvent blk)
761+
| TraceAddPerasCertEvent (TraceAddPerasCertEvent blk)
732762
deriving Generic
733763

734764
deriving instance
@@ -1035,3 +1065,26 @@ data TraceIteratorEvent blk
10351065
newtype TraceChainSelStarvationEvent blk
10361066
= ChainSelStarvation (Enclosing' (RealPoint blk))
10371067
deriving (Generic, Eq, Show)
1068+
1069+
data TraceAddPerasCertEvent blk
1070+
= -- | The Peras certificate from the given round boosting the given block was
1071+
-- added to the queue. The size of the queue is included.
1072+
AddedPerasCertToQueue PerasRoundNo (Point blk) (Enclosing' Word)
1073+
| -- | The Peras certificate from the given round boosting the given block was
1074+
-- popped from the queue.
1075+
PoppedPerasCertFromQueue PerasRoundNo (Point blk)
1076+
| -- | The Peras certificate from the given round boosting the given block was
1077+
-- too old, ie its slot was older than the current immutable slot (the third
1078+
-- argument).
1079+
IgnorePerasCertTooOld PerasRoundNo (Point blk) (Point blk)
1080+
| -- | The Peras certificate from the given round boosts a block on the
1081+
-- current selection.
1082+
PerasCertBoostsCurrentChain PerasRoundNo (Point blk)
1083+
| -- | The Peras certificate from the given round boosts the Genesis point.
1084+
PerasCertBoostsGenesis PerasRoundNo
1085+
| -- | The Peras certificate from the given round boosts a block that we have
1086+
-- not (yet) received.
1087+
PerasCertBoostsBlockNotYetReceived PerasRoundNo (Point blk)
1088+
| -- | Perform chain selection for a block boosted by a Peras certificate.
1089+
ChainSelectionForBoostedBlock PerasRoundNo (Point blk)
1090+
deriving (Generic, Eq, Show)

ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/StateMachine.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,6 +1352,8 @@ deriving instance SOP.Generic (PerasCertDB.TraceEvent blk)
13521352
deriving instance SOP.HasDatatypeInfo (PerasCertDB.TraceEvent blk)
13531353
deriving anyclass instance SOP.Generic (TraceChainSelStarvationEvent blk)
13541354
deriving anyclass instance SOP.HasDatatypeInfo (TraceChainSelStarvationEvent blk)
1355+
deriving anyclass instance SOP.Generic (TraceAddPerasCertEvent blk)
1356+
deriving anyclass instance SOP.HasDatatypeInfo (TraceAddPerasCertEvent blk)
13551357

13561358
data Tag
13571359
= TagGetIsValidJust
@@ -1779,6 +1781,7 @@ traceEventName = \case
17791781
TracePerasCertDbEvent ev -> "PerasCertDB." <> constrName ev
17801782
TraceLastShutdownUnclean -> "LastShutdownUnclean"
17811783
TraceChainSelStarvationEvent ev -> "ChainSelStarvation." <> constrName ev
1784+
TraceAddPerasCertEvent ev -> "AddPerasCert." <> constrName ev
17821785

17831786
mkArgs ::
17841787
IOLike m =>

0 commit comments

Comments
 (0)