From dbe1fafaff37705f6fa77b643902f74f9d051bf7 Mon Sep 17 00:00:00 2001 From: Thomas BAGREL Date: Fri, 12 Sep 2025 14:37:41 +0200 Subject: [PATCH 1/8] Change s-r-p for `ouroboros-network` to add ObjectDiffusion support Also use defaultMiniProtocolParameters instead of hardcoded value in unstable-diffusion-testlib to account for newly defined parameters in the new `ouroboros-network` version. Also integrate `NodeToNodeV_16`. Co-authored-by: Agustin Mista Co-authored-by: Alexander Esgen Co-authored-by: Georgy Lukyanov Co-authored-by: Thomas BAGREL Co-authored-by: Nicolas BACQUEY Co-authored-by: Nicolas "Niols" Jeannerod --- cabal.project | 8 +++++--- .../Ouroboros/Consensus/Cardano/Node.hs | 1 + .../Consensus/Shelley/Ledger/NetworkProtocolVersion.hs | 1 + .../Test/ThreadNet/Network.hs | 10 ++-------- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/cabal.project b/cabal.project index 3f7ef26e1f..d468545b24 100644 --- a/cabal.project +++ b/cabal.project @@ -91,12 +91,14 @@ allow-newer: , fin:QuickCheck , bin:QuickCheck --- Backported version of https://github.com/IntersectMBO/ouroboros-network/pull/5161 +-- Using https://github.com/IntersectMBO/ouroboros-network/tree/peras-staging/pr-5202-v2 source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: b07a86ed853b63881b5a83e57508902f1562ac01 - --sha256: sha256-n/XX0+cQegq2a1cAfmGx30T64eix4oEXzpVEFCKqmg0= + tag: 0db8669b67982cba755e80bf2e413527def41244 + --sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ= subdir: + ouroboros-network + ouroboros-network-protocols ouroboros-network-api ouroboros-network diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs index 52a7e4f910..98093f86c5 100644 --- a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs @@ -430,6 +430,7 @@ instance Map.fromList $ [ (NodeToNodeV_14, CardanoNodeToNodeVersion2) , (NodeToNodeV_15, CardanoNodeToNodeVersion2) + , (NodeToNodeV_16, CardanoNodeToNodeVersion2) ] supportedNodeToClientVersions _ = diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs index c03e0e5179..7003a5ce8a 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs @@ -48,6 +48,7 @@ instance SupportedNetworkProtocolVersion (ShelleyBlock proto era) where Map.fromList [ (NodeToNodeV_14, ShelleyNodeToNodeVersion1) , (NodeToNodeV_15, ShelleyNodeToNodeVersion1) + , (NodeToNodeV_16, ShelleyNodeToNodeVersion1) ] supportedNodeToClientVersions _ = Map.fromList diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 6df036c539..847659c2ad 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -123,8 +123,8 @@ import Ouroboros.Network.NodeToNode ( ConnectionId (..) , ExpandedInitiatorContext (..) , IsBigLedgerPeer (..) - , MiniProtocolParameters (..) , ResponderContext (..) + , defaultMiniProtocolParameters ) import Ouroboros.Network.PeerSelection.Governor ( makePublicPeerSelectionStateVar @@ -1056,13 +1056,7 @@ runThreadNetwork , mempoolCapacityOverride = NoMempoolCapacityBytesOverride , keepAliveRng = kaRng , peerSharingRng = psRng - , miniProtocolParameters = - MiniProtocolParameters - { chainSyncPipeliningHighMark = 4 - , chainSyncPipeliningLowMark = 2 - , blockFetchPipeliningMax = 10 - , txSubmissionMaxUnacked = 1000 -- TODO ? - } + , miniProtocolParameters = defaultMiniProtocolParameters , blockFetchConfiguration = BlockFetchConfiguration { bfcMaxConcurrencyBulkSync = 1 From cd37dba6b492f5b3ca1670c816118ae913f54d9b Mon Sep 17 00:00:00 2001 From: Thomas BAGREL Date: Fri, 12 Sep 2025 14:40:00 +0200 Subject: [PATCH 2/8] Implement general ObjectDiffusion protocol, and related `ObjectPool{Reader,Writer}` API Co-authored-by: Agustin Mista Co-authored-by: Alexander Esgen Co-authored-by: Georgy Lukyanov Co-authored-by: Thomas BAGREL Co-authored-by: Nicolas BACQUEY Co-authored-by: Nicolas "Niols" Jeannerod --- ouroboros-consensus/ouroboros-consensus.cabal | 3 + .../MiniProtocol/ObjectDiffusion/Inbound.hs | 454 ++++++++++++++++++ .../ObjectDiffusion/ObjectPool/API.hs | 59 +++ .../MiniProtocol/ObjectDiffusion/Outbound.hs | 244 ++++++++++ 4 files changed, 760 insertions(+) create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 5f2cd98720..2eaf42d454 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -191,6 +191,9 @@ library Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound Ouroboros.Consensus.Node.GsmState Ouroboros.Consensus.Node.InitStorage Ouroboros.Consensus.Node.NetworkProtocolVersion diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs new file mode 100644 index 0000000000..f72299d0fe --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs @@ -0,0 +1,454 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound + ( objectDiffusionInbound + , TraceObjectDiffusionInbound (..) + , ObjectDiffusionInboundError (..) + , NumObjectsProcessed (..) + ) where + +import Cardano.Prelude (catMaybes, (&)) +import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked +import Control.Exception (assert) +import Control.Monad (when) +import Control.Monad.Class.MonadSTM +import Control.Monad.Class.MonadThrow +import Control.Tracer (Tracer, traceWith) +import Data.Foldable as Foldable (foldl', toList) +import Data.List qualified as List +import Data.List.NonEmpty qualified as NonEmpty +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.Sequence.Strict (StrictSeq) +import Data.Sequence.Strict qualified as Seq +import Data.Set (Set) +import Data.Set qualified as Set +import Data.Word (Word64) +import GHC.Generics (Generic) +import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt) +import NoThunks.Class (NoThunks (..)) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.Util.NormalForm.Invariant (noThunksInvariant) +import Ouroboros.Network.ControlMessage +import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + +-- Note: This module is inspired from TxSubmission inbound side. + +newtype NumObjectsProcessed + = NumObjectsProcessed + { getNumObjectsProcessed :: Word64 + } + deriving (Eq, Show) + +data TraceObjectDiffusionInbound objectId object + = -- | Number of objects just about to be inserted. + TraceObjectDiffusionInboundCollectedObjects Int + | -- | Just processed object pass/fail breakdown. + TraceObjectDiffusionInboundAddedObjects NumObjectsProcessed + | -- | Received a 'ControlMessage' from the outbound peer governor, and about + -- to act on it. + TraceObjectDiffusionInboundRecvControlMessage ControlMessage + | TraceObjectDiffusionInboundCanRequestMoreObjects Int + | TraceObjectDiffusionInboundCannotRequestMoreObjects Int + deriving (Eq, Show) + +data ObjectDiffusionInboundError + = ProtocolErrorObjectNotRequested + | ProtocolErrorObjectIdsNotRequested + | ProtocolErrorObjectIdAlreadyKnown + | ProtocolErrorObjectIdsDuplicate + deriving Show + +instance Exception ObjectDiffusionInboundError where + displayException ProtocolErrorObjectNotRequested = + "The peer replied with a object we did not ask for." + displayException ProtocolErrorObjectIdsNotRequested = + "The peer replied with more objectIds than we asked for." + displayException ProtocolErrorObjectIdAlreadyKnown = + "The peer replied with an objectId that it has already sent us previously." + displayException ProtocolErrorObjectIdsDuplicate = + "The peer replied with a batch of objectIds containing a duplicate." + +-- | Information maintained internally in the 'objectDiffusionInbound' +-- implementation. +data InboundSt objectId object = InboundSt + { numIdsInFlight :: !NumObjectIdsReq + -- ^ The number of object identifiers that we have requested but + -- which have not yet been replied to. We need to track this to keep + -- our requests within the limit on the 'outstandingFifo' size. + , outstandingFifo :: !(StrictSeq objectId) + -- ^ This mirrors the queue of objects that the outbound peer has available + -- for us. Objects are kept in the order in which the outbound peer + -- advertised them to us. This is the same order in which we submit them to + -- the objectPool. It is also the order we acknowledge them. + , canRequestNext :: !(Set objectId) + -- ^ The objectIds that we can request. These are a subset of the + -- 'outstandingFifo' that we have not yet requested or not have in the pool + -- already. This is not ordered to illustrate the fact that we can + -- request objects out of order. + , pendingObjects :: !(Map objectId (Maybe object)) + -- ^ Objects we have successfully downloaded (or decided intentionally to + -- skip download) but have not yet added to the objectPool or acknowledged. + -- + -- Object IDs in this 'Map' are mapped to 'Nothing' if we notice that + -- they are already in the objectPool. That way we can skip requesting them + -- from the outbound peer, but still acknowledge them when the time comes. + , numToAckOnNextReq :: !NumObjectIdsAck + -- ^ The number of objects we can acknowledge on our next request + -- for more object IDs. Their corresponding IDs have already been removed + -- from 'outstandingFifo'. + } + deriving stock (Show, Generic) + deriving anyclass NoThunks + +initialInboundSt :: InboundSt objectId object +initialInboundSt = InboundSt 0 Seq.empty Set.empty Map.empty 0 + +objectDiffusionInbound :: + forall objectId object m. + ( Ord objectId + , NoThunks objectId + , NoThunks object + , MonadSTM m + , MonadThrow m + ) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + -- | Maximum values for outstanding FIFO length, number of IDs to request, + -- and number of objects to request + (NumObjectsOutstanding, NumObjectIdsReq, NumObjectsReq) -> + ObjectPoolWriter objectId object m -> + NodeToNodeVersion -> + ControlMessageSTM m -> + ObjectDiffusionInboundPipelined objectId object m () +objectDiffusionInbound + tracer + (maxFifoLength, maxNumIdsToReq, maxNumObjectsToReq) + ObjectPoolWriter{..} + _version + controlMessageSTM = + ObjectDiffusionInboundPipelined $! + checkState initialInboundSt & go Zero + where + canRequestMoreObjects :: InboundSt k object -> Bool + canRequestMoreObjects !st = + not (Set.null (canRequestNext st)) + + -- Computes how many new IDs we can request so that receiving all of them + -- won't make 'outstandingFifo' exceed 'maxFifoLength'. + numIdsToReq :: InboundSt objectId object -> NumObjectIdsReq + numIdsToReq !st = + maxNumIdsToReq + `min` ( fromIntegral maxFifoLength + - (fromIntegral $ Seq.length $ outstandingFifo st) + - numIdsInFlight st + ) + + -- Updates 'InboundSt' with new object IDs and return the updated 'InboundSt'. + -- + -- Collected object IDs that are already in the objectPool are pre-emptively + -- acknowledged so that we don't need to bother requesting them from the + -- outbound peer. + preAcknowledge :: + InboundSt objectId object -> + (objectId -> Bool) -> + [objectId] -> + InboundSt objectId object + preAcknowledge !st _ collectedIds | null collectedIds = st + preAcknowledge !st poolHasObject collectedIds = + let + -- Divide the collected IDs in two parts: those that are already in the + -- objectPool and those that are not. + (alreadyObtained, notYetObtained) = + List.partition + poolHasObject + collectedIds + + -- The objects that we intentionally don't request, because they are + -- already in the objectPool, will need to be acknowledged. + -- So we extend 'pendingObjects' with those objects (so of course they + -- have no corresponding reply). + pendingObjects' = + foldl' + (\accMap objectId -> Map.insert objectId Nothing accMap) + (pendingObjects st) + alreadyObtained + + -- We initially extend 'outstandingFifo' with the all the collected IDs + -- (to properly mirror the server state). + outstandingFifo' = outstandingFifo st <> Seq.fromList collectedIds + + -- Now check if the update of 'pendingObjects' let us acknowledge a prefix + -- of the 'outstandingFifo', as we do in 'goCollect' -> 'CollectObjects'. + (objectIdsToAck, outstandingFifo'') = + Seq.spanl (`Map.member` pendingObjects') outstandingFifo' + + -- If so we can remove them from the 'pendingObjects' structure. + -- + -- Note that unlike in TX-Submission, we made sure the outstanding FIFO + -- couldn't have duplicate IDs, so we don't have to worry about re-adding + -- the duplicate IDs to 'pendingObjects' for future acknowledgment. + pendingObjects'' = + Foldable.foldl' + (flip Map.delete) + pendingObjects' + objectIdsToAck + + !st' = + st + { canRequestNext = canRequestNext st <> (Set.fromList notYetObtained) + , pendingObjects = pendingObjects'' + , outstandingFifo = outstandingFifo'' + , numToAckOnNextReq = + numToAckOnNextReq st + + fromIntegral (Seq.length objectIdsToAck) + } + in + st' + + go :: + forall (n :: N). + Nat n -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + go n !st = WithEffect $ do + -- Check whether we should continue engaging in the protocol. + ctrlMsg <- atomically controlMessageSTM + traceWith tracer $ + TraceObjectDiffusionInboundRecvControlMessage ctrlMsg + case ctrlMsg of + -- The peer selection governor is asking us to terminate the connection. + Terminate -> + pure $! terminateAfterDrain n + -- Otherwise, we can continue the protocol normally. + _continue -> case n of + -- We didn't pipeline any requests, so there are no replies in flight + -- (nothing to collect) + Zero -> do + if canRequestMoreObjects st + then do + -- There are no replies in flight, but we do know some more objects + -- we can ask for, so lets ask for them and more objectIds in a + -- pipelined way. + traceWith tracer $ + TraceObjectDiffusionInboundCanRequestMoreObjects (natToInt n) + pure $! checkState st & goReqObjectsAndObjectIdsPipelined Zero + else do + -- There's no replies in flight, and we have no more objects we can + -- ask for so the only remaining thing to do is to ask for more + -- objectIds. Since this is the only thing to do now, we make this a + -- blocking call. + traceWith tracer $ + TraceObjectDiffusionInboundCannotRequestMoreObjects (natToInt n) + pure $! checkState st & goReqObjectIdsBlocking + + -- We have pipelined some requests, so there are some replies in flight. + Succ n' -> + if canRequestMoreObjects st + then do + -- We have replies in flight and we should eagerly collect them if + -- available, but there are objects to request too so we + -- should *not* block waiting for replies. + -- So we ask for new objects and objectIds in a pipelined way. + traceWith tracer $ + TraceObjectDiffusionInboundCanRequestMoreObjects (natToInt n) + pure $! + CollectPipelined + (Just (checkState st & goReqObjectsAndObjectIdsPipelined (Succ n'))) + (\collected -> checkState st & goCollect n' collected) + else do + traceWith tracer $ + TraceObjectDiffusionInboundCannotRequestMoreObjects (natToInt n) + -- In this case we can theoretically only collect replies or request + -- new object IDs. + -- + -- But it's important not to pipeline more requests for objectIds now + -- because if we did, then immediately after sending the request (but + -- having not yet received a response to either this or the other + -- pipelined requests), we would directly re-enter this code path, + -- resulting us in filling the pipeline with an unbounded number of + -- requests. + -- + -- So we instead block until we collect a reply. + pure $! + CollectPipelined + Nothing + (\collected -> checkState st & goCollect n' collected) + + goCollect :: + forall (n :: N). + Nat n -> + Collect objectId object -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + goCollect n collect !st = case collect of + CollectObjectIds numIdsRequested collectedIds -> WithEffect $ do + let numCollectedIds = length collectedIds + collectedIdsSet = Set.fromList collectedIds + + -- Check they didn't send more than we asked for. We don't need to + -- check for a minimum: the blocking case checks for non-zero + -- elsewhere, and for the non-blocking case it is quite normal for + -- them to send us none. + when (numCollectedIds > fromIntegral numIdsRequested) $ + throwIO ProtocolErrorObjectIdsNotRequested + + -- Check that the server didn't send IDs that were already in the + -- outstanding FIFO + when (any (`Set.member` collectedIdsSet) (outstandingFifo st)) $ + throwIO ProtocolErrorObjectIdAlreadyKnown + + -- Check that the server didn't send duplicate IDs in its response + when (Set.size collectedIdsSet /= numCollectedIds) $ + throwIO ProtocolErrorObjectIdsDuplicate + + -- We extend our outstanding FIFO with the newly received objectIds by + -- calling 'preAcknowledge' which will also pre-emptively acknowledge the + -- objectIds that we already have in the pool and thus don't need to + -- request. + let !st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested} + poolHasObject <- atomically $ opwHasObject + let !st'' = preAcknowledge st' poolHasObject collectedIds + pure $! checkState st'' & go n + CollectObjects requestedIds collectedObjects -> WithEffect $ do + let requestedIdsSet = Set.fromList requestedIds + obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects) + + -- To start with we have to verify that the objects they have sent us are + -- exactly the objects we asked for, not more, not less. + when (requestedIdsSet /= obtainedIdsSet) $ + throwIO ProtocolErrorObjectNotRequested + + traceWith tracer $ + TraceObjectDiffusionInboundCollectedObjects (length collectedObjects) + + -- We update 'pendingObjects' with the newly obtained objects + let pendingObjects' = + foldl' + (\accMap object -> Map.insert (opwObjectId object) (Just object) accMap) + (pendingObjects st) + collectedObjects + + -- We then find the longest prefix of 'outstandingFifo' for which we have + -- all the corresponding IDs in 'pendingObjects'. + -- We remove this prefix from 'outstandingFifo'. + (objectIdsToAck, outstandingFifo') = + Seq.spanl (`Map.member` pendingObjects') (outstandingFifo st) + + -- And also remove these entries from 'pendingObjects'. + -- + -- Note that unlike in TX-Submission, we made sure the outstanding FIFO + -- couldn't have duplicate IDs, so we don't have to worry about re-adding + -- the duplicate IDs to 'pendingObjects' for future acknowledgment. + pendingObjects'' = + Foldable.foldl' + (flip Map.delete) + pendingObjects' + objectIdsToAck + + -- These are the objects we need to submit to the object pool + objectsToAck = + catMaybes $ + (((Map.!) pendingObjects') <$> toList objectIdsToAck) + + opwAddObjects objectsToAck + traceWith tracer $ + TraceObjectDiffusionInboundAddedObjects + (NumObjectsProcessed (fromIntegral $ length objectsToAck)) + + let !st' = + st + { pendingObjects = pendingObjects'' + , outstandingFifo = outstandingFifo' + , numToAckOnNextReq = + numToAckOnNextReq st + + fromIntegral (Seq.length objectIdsToAck) + } + pure $! checkState st' & go n + + goReqObjectIdsBlocking :: + InboundSt objectId object -> + InboundStIdle 'Z objectId object m () + goReqObjectIdsBlocking !st = + let numIdsToRequest = numIdsToReq st + -- We should only request new object IDs in a blocking way if we have + -- absolutely nothing else we can do. + !st' = + st + { numToAckOnNextReq = 0 + , numIdsInFlight = numIdsToRequest + } + in assert + ( numIdsInFlight st == 0 + && Seq.null (outstandingFifo st) + && Set.null (canRequestNext st) + && Map.null (pendingObjects st) + ) + $ SendMsgRequestObjectIdsBlocking + (numToAckOnNextReq st) + numIdsToRequest + ( \neCollectedIds -> + checkState st' & goCollect Zero (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds)) + ) + + goReqObjectsAndObjectIdsPipelined :: + forall (n :: N). + Nat n -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + goReqObjectsAndObjectIdsPipelined n !st = + -- TODO: This implementation is deliberately naive, we pick in an + -- arbitrary order. We may want to revisit this later. + let (toRequest, canRequestNext') = + Set.splitAt (fromIntegral maxNumObjectsToReq) (canRequestNext st) + !st' = st{canRequestNext = canRequestNext'} + in SendMsgRequestObjectsPipelined + (toList toRequest) + (checkState st' & goReqObjectIdsPipelined (Succ n)) + + goReqObjectIdsPipelined :: + forall (n :: N). + Nat n -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + goReqObjectIdsPipelined n !st = + let numIdsToRequest = numIdsToReq st + in if numIdsToRequest <= 0 + then checkState st & go n + else + let !st' = + st + { numIdsInFlight = + numIdsInFlight st + + numIdsToRequest + , numToAckOnNextReq = 0 + } + in SendMsgRequestObjectIdsPipelined + (numToAckOnNextReq st) + numIdsToRequest + (checkState st' & go (Succ n)) + + -- Ignore all outstanding replies to messages we pipelined ("drain"), and then + -- terminate. + terminateAfterDrain :: + Nat n -> InboundStIdle n objectId object m () + terminateAfterDrain = \case + Zero -> SendMsgDone () + Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n + +-- | Helper to ensure that the `InboundSt` is free of unexpected thunks and +-- stays strict during the whole process +checkState :: NoThunks s => s -> s +checkState !st = checkInvariant (noThunksInvariant st) st diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs new file mode 100644 index 0000000000..2f949d8b3b --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs @@ -0,0 +1,59 @@ +-- | API for reading from and writing to object pools in the ObjectDiffusion +-- miniprotocol. +-- +-- The underlying object pool can be any database, such as a 'PerasCertDb' in +-- Peras certificate diffusion. +-- +-- 'ObjectPoolReader' is used on the outbound side of the protocol. Objects in +-- the pool are ordered by a strictly increasing ticket number ('ticketNo'), +-- which represents their time of arrival. Ticket numbers are local to each +-- node, unlike object IDs, which are global. Object IDs are not used for +-- ordering, since objects may arrive slightly out of order from peers. +-- +-- To read from the pool, one requests objects with a ticket number strictly +-- greater than the last known one. 'oprZeroTicketNo' provides an initial ticket +-- number for the first request. +-- +-- 'ObjectPoolWriter' is used on the inbound side of the protocol. It allows +-- checking whether an object is already present (to avoid re-requesting it) and +-- appending new objects. Ticket numbers are not part of the inbound interface, +-- but are used internally: newly added objects always receive a ticket number +-- strictly greater than those of older ones. +-- +-- This API design is inspired by 'MempoolSnapshot' from the TX-submission +-- miniprotocol, see: +-- +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API + ( ObjectPoolReader (..) + , ObjectPoolWriter (..) + ) where + +import Control.Concurrent.Class.MonadSTM.Strict (STM) +import Data.Word (Word64) + +-- | Interface used by the outbound side of object diffusion as its source of +-- objects to give to the remote side. +data ObjectPoolReader objectId object ticketNo m + = ObjectPoolReader + { oprObjectId :: object -> objectId + -- ^ Return the id of the specified object + , oprZeroTicketNo :: ticketNo + -- ^ Ticket number before the first item in the pool. + , oprObjectsAfter :: ticketNo -> Word64 -> STM m [(ticketNo, objectId, m object)] + -- ^ Get the list of objects available in the pool with a ticketNo greater + -- than the specified one. The number of returned objects is capped by the + -- given Word64. Only the IDs and ticketNos of the objects are directly + -- accessible; each actual object must be loaded through a monadic action. + } + +-- | Interface used by the inbound side of object diffusion when receiving +-- objects. +data ObjectPoolWriter objectId object m + = ObjectPoolWriter + { opwObjectId :: object -> objectId + -- ^ Return the id of the specified object + , opwAddObjects :: [object] -> m () + -- ^ Add a batch of objects to the objectPool. + , opwHasObject :: STM m (objectId -> Bool) + -- ^ Check if the object pool contains an object with the given id + } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs new file mode 100644 index 0000000000..34c90b9836 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs @@ -0,0 +1,244 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound + ( objectDiffusionOutbound + , TraceObjectDiffusionOutbound (..) + , ObjectDiffusionOutboundError (..) + ) where + +import Control.Exception (assert) +import Control.Monad (forM, unless, when) +import Control.Monad.Class.MonadSTM +import Control.Monad.Class.MonadThrow +import Control.Tracer (Tracer, traceWith) +import Data.List.NonEmpty qualified as NonEmpty +import Data.Sequence.Strict (StrictSeq) +import Data.Sequence.Strict qualified as Seq +import Data.Set qualified as Set +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + +-- Note: This module is inspired from TxSubmission outbound side. + +data TraceObjectDiffusionOutbound objectId object + = TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq + | -- | The IDs to be sent in the response + TraceObjectDiffusionOutboundSendMsgReplyObjectIds [objectId] + | -- | The IDs of the objects requested. + TraceObjectDiffusionOutboundRecvMsgRequestObjects + [objectId] + | -- | The objects to be sent in the response. + TraceObjectDiffusionOutboundSendMsgReplyObjects + [object] + | -- | Received 'MsgDone' + TraceObjectDiffusionOutboundTerminated + deriving Show + +data ObjectDiffusionOutboundError + = ProtocolErrorAckedTooManyObjectIds + | ProtocolErrorRequestedNothing + | ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq NumObjectsOutstanding + | ProtocolErrorRequestBlocking + | ProtocolErrorRequestNonBlocking + | ProtocolErrorRequestedUnavailableObject + | ProtocolErrorRequestedDuplicateObject + deriving Show + +instance Exception ObjectDiffusionOutboundError where + displayException ProtocolErrorAckedTooManyObjectIds = + "The peer tried to acknowledged more objectIds than are available to do so." + displayException (ProtocolErrorRequestedTooManyObjectIds reqNo maxUnacked) = + "The peer requested " + ++ show reqNo + ++ " objectIds which would put the " + ++ "total in flight over the limit of " + ++ show maxUnacked + displayException ProtocolErrorRequestedNothing = + "The peer requested zero objectIds." + displayException ProtocolErrorRequestBlocking = + "The peer made a blocking request for more objectIds when there are still " + ++ "unacknowledged objectIds. It should have used a non-blocking request." + displayException ProtocolErrorRequestNonBlocking = + "The peer made a non-blocking request for more objectIds when there are " + ++ "no unacknowledged objectIds. It should have used a blocking request." + displayException ProtocolErrorRequestedUnavailableObject = + "The peer requested an object which is not available, either " + ++ "because it was never available or because it was previously requested." + displayException ProtocolErrorRequestedDuplicateObject = + "The peer requested the same object twice." + +data OutboundSt objectId object ticketNo = OutboundSt + { outstandingFifo :: !(StrictSeq object) + , lastTicketNo :: !ticketNo + } + +objectDiffusionOutbound :: + forall objectId object ticketNo m. + (Ord objectId, Ord ticketNo, MonadSTM m, MonadThrow m) => + Tracer m (TraceObjectDiffusionOutbound objectId object) -> + -- | Maximum number of unacknowledged objectIds allowed + NumObjectsOutstanding -> + ObjectPoolReader objectId object ticketNo m -> + NodeToNodeVersion -> + ObjectDiffusionOutbound objectId object m () +objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version = + ObjectDiffusionOutbound (pure (makeBundle $ OutboundSt Seq.empty oprZeroTicketNo)) + where + makeBundle :: OutboundSt objectId object ticketNo -> OutboundStIdle objectId object m () + makeBundle !st = + OutboundStIdle + { recvMsgRequestObjectIds = recvMsgRequestObjectIds st + , recvMsgRequestObjects = recvMsgRequestObjects st + , recvMsgDone = traceWith tracer TraceObjectDiffusionOutboundTerminated + } + + updateStNewObjects :: + OutboundSt objectId object ticketNo -> + [(object, ticketNo)] -> + OutboundSt objectId object ticketNo + updateStNewObjects !OutboundSt{..} newObjectsWithTicketNos = + -- These objects should all be fresh + assert (all (\(_, ticketNo) -> ticketNo > lastTicketNo) newObjectsWithTicketNos) $ + let !outstandingFifo' = + outstandingFifo + <> (Seq.fromList $ fst <$> newObjectsWithTicketNos) + !lastTicketNo' + | null newObjectsWithTicketNos = lastTicketNo + | otherwise = snd $ last newObjectsWithTicketNos + in OutboundSt + { outstandingFifo = outstandingFifo' + , lastTicketNo = lastTicketNo' + } + + recvMsgRequestObjectIds :: + forall blocking. + OutboundSt objectId object ticketNo -> + SingBlockingStyle blocking -> + NumObjectIdsAck -> + NumObjectIdsReq -> + m (OutboundStObjectIds blocking objectId object m ()) + recvMsgRequestObjectIds !st@OutboundSt{..} blocking numIdsToAck numIdsToReq = do + traceWith tracer (TraceObjectDiffusionOutboundRecvMsgRequestObjectIds numIdsToReq) + + when (numIdsToAck > fromIntegral (Seq.length outstandingFifo)) $ + throwIO ProtocolErrorAckedTooManyObjectIds + + when + ( Seq.length outstandingFifo + - fromIntegral numIdsToAck + + fromIntegral numIdsToReq + > fromIntegral maxFifoLength + ) + $ throwIO (ProtocolErrorRequestedTooManyObjectIds numIdsToReq maxFifoLength) + + -- First we update our FIFO to remove the number of objectIds that the + -- inbound peer has acknowledged. + let !outstandingFifo' = Seq.drop (fromIntegral numIdsToAck) outstandingFifo + -- must specify the type here otherwise GHC complains about mismatch objectId types + st' :: OutboundSt objectId object ticketNo + !st' = st{outstandingFifo = outstandingFifo'} + + -- Grab info about any new objects after the last object ticketNo we've + -- seen, up to the number that the peer has requested. + case blocking of + ----------------------------------------------------------------------- + SingBlocking -> do + when (numIdsToReq == 0) $ + throwIO ProtocolErrorRequestedNothing + unless (Seq.null outstandingFifo') $ + throwIO ProtocolErrorRequestBlocking + + newContent <- atomically $ do + newObjectsWithTicketNos <- + oprObjectsAfter + lastTicketNo + (fromIntegral numIdsToReq) + check (not $ null newObjectsWithTicketNos) + pure newObjectsWithTicketNos + + newObjectsWithTicketNos <- forM newContent $ + \(ticketNo, _, getObject) -> do + object <- getObject + pure (object, ticketNo) + + let !newIds = oprObjectId . fst <$> newObjectsWithTicketNos + st'' = updateStNewObjects st' newObjectsWithTicketNos + + traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds) + + -- Assert objects is non-empty: we blocked until objects was + -- non-null, and we know numIdsToReq > 0, hence + -- `take numIdsToReq objects` is non-null. + assert (not $ null newObjectsWithTicketNos) $ + pure $ + SendMsgReplyObjectIds + (BlockingReply (NonEmpty.fromList $ newIds)) + (makeBundle st'') + + ----------------------------------------------------------------------- + SingNonBlocking -> do + when (numIdsToReq == 0 && numIdsToAck == 0) $ + throwIO ProtocolErrorRequestedNothing + when (Seq.null outstandingFifo') $ + throwIO ProtocolErrorRequestNonBlocking + + newContent <- + atomically $ + oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq) + newObjectsWithTicketNos <- forM newContent $ + \(ticketNo, _, getObject) -> do + object <- getObject + pure (object, ticketNo) + + let !newIds = oprObjectId . fst <$> newObjectsWithTicketNos + st'' = updateStNewObjects st' newObjectsWithTicketNos + + traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds) + + pure (SendMsgReplyObjectIds (NonBlockingReply newIds) (makeBundle st'')) + + recvMsgRequestObjects :: + OutboundSt objectId object ticketNo -> + [objectId] -> + m (OutboundStObjects objectId object m ()) + recvMsgRequestObjects !st@OutboundSt{..} requestedIds = do + traceWith tracer (TraceObjectDiffusionOutboundRecvMsgRequestObjects requestedIds) + + -- All the objects correspond to advertised objectIds are already in the + -- outstandingFifo. So we don't need to read from the object pool here. + + -- I've optimized the search to do only one traversal of 'outstandingFifo'. + -- When the 'requestedIds' is exactly the whole 'outstandingFifo', then this + -- should take O(n * log n) time. + -- + -- TODO: We might need to revisit the underlying 'outstandingFifo' data + -- structure and the search if performance isn't sufficient when we'll use + -- ObjectDiffusion for votes diffusion (and not just cert diffusion). + + let requestedIdsSet = Set.fromList requestedIds + + when (Set.size requestedIdsSet /= length requestedIds) $ + throwIO ProtocolErrorRequestedDuplicateObject + + let requestedObjects = + foldr + ( \obj acc -> + if Set.member (oprObjectId obj) requestedIdsSet + then obj : acc + else acc + ) + [] + outstandingFifo + + when (Set.size requestedIdsSet /= length requestedObjects) $ + throwIO ProtocolErrorRequestedUnavailableObject + + traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjects requestedObjects) + + pure (SendMsgReplyObjects requestedObjects (makeBundle st)) From fa6cefed4f0fa9b85b427535aa5da1df34adc6b4 Mon Sep 17 00:00:00 2001 From: Thomas BAGREL Date: Fri, 12 Sep 2025 14:50:59 +0200 Subject: [PATCH 3/8] Add smoke tests for generic ObjectDiffusion Co-authored-by: Agustin Mista Co-authored-by: Alexander Esgen Co-authored-by: Georgy Lukyanov Co-authored-by: Thomas BAGREL Co-authored-by: Nicolas BACQUEY Co-authored-by: Nicolas "Niols" Jeannerod --- ouroboros-consensus/ouroboros-consensus.cabal | 1 + .../test/consensus-test/Main.hs | 2 + .../MiniProtocol/ObjectDiffusion/Smoke.hs | 302 ++++++++++++++++++ 3 files changed, 305 insertions(+) create mode 100644 ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 2eaf42d454..d5e65f2b72 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -667,6 +667,7 @@ test-suite consensus-test Test.Consensus.MiniProtocol.ChainSync.CSJ Test.Consensus.MiniProtocol.ChainSync.Client Test.Consensus.MiniProtocol.LocalStateQuery.Server + Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke Test.Consensus.Peras.WeightSnapshot Test.Consensus.Util.MonadSTM.NormalForm Test.Consensus.Util.Versioned diff --git a/ouroboros-consensus/test/consensus-test/Main.hs b/ouroboros-consensus/test/consensus-test/Main.hs index beddd1f7d2..439d7b3043 100644 --- a/ouroboros-consensus/test/consensus-test/Main.hs +++ b/ouroboros-consensus/test/consensus-test/Main.hs @@ -16,6 +16,7 @@ import qualified Test.Consensus.MiniProtocol.BlockFetch.Client (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.CSJ (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.Client (tests) import qualified Test.Consensus.MiniProtocol.LocalStateQuery.Server (tests) +import qualified Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke (tests) import qualified Test.Consensus.Peras.WeightSnapshot (tests) import qualified Test.Consensus.Util.MonadSTM.NormalForm (tests) import qualified Test.Consensus.Util.Versioned (tests) @@ -37,6 +38,7 @@ tests = , Test.Consensus.MiniProtocol.BlockFetch.Client.tests , Test.Consensus.MiniProtocol.ChainSync.CSJ.tests , Test.Consensus.MiniProtocol.ChainSync.Client.tests + , Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke.tests , Test.Consensus.MiniProtocol.LocalStateQuery.Server.tests , testGroup "Mempool" diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs new file mode 100644 index 0000000000..d2f21c9b66 --- /dev/null +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs @@ -0,0 +1,302 @@ +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | Smoke tests for the object diffusion protocol. This uses a trivial object +-- pool and checks that a few objects can indeed be transferred from the +-- outbound to the inbound peer. +module Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke + ( tests + , WithId (..) + , ListWithUniqueIds (..) + , ProtocolConstants + , prop_smoke_object_diffusion + ) where + +import Control.Monad.IOSim (runSimStrictShutdown) +import Control.ResourceRegistry (forkLinkedThread, waitAnyThread, withRegistry) +import Control.Tracer (Tracer, nullTracer, traceWith) +import Data.Containers.ListUtils (nubOrdOn) +import Data.Functor.Contravariant (contramap) +import Network.TypedProtocol.Channel (Channel, createConnectedChannels) +import Network.TypedProtocol.Codec (AnyMessage) +import Network.TypedProtocol.Driver.Simple (runPeer, runPipelinedPeer) +import NoThunks.Class (NoThunks) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound + ( objectDiffusionInbound + ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API + ( ObjectPoolReader (..) + , ObjectPoolWriter (..) + ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) +import Ouroboros.Consensus.Util.IOLike + ( IOLike + , MonadDelay (..) + , MonadSTM (..) + , StrictTVar + , modifyTVar + , readTVar + , uncheckedNewTVarM + , writeTVar + ) +import Ouroboros.Network.ControlMessage (ControlMessage (..)) +import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..)) +import Ouroboros.Network.Protocol.ObjectDiffusion.Codec (codecObjectDiffusionId) +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + ( ObjectDiffusionInboundPipelined + , objectDiffusionInboundPeerPipelined + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound + ( ObjectDiffusionOutbound + , objectDiffusionOutboundPeer + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + ( NumObjectIdsReq (..) + , NumObjectsOutstanding (..) + , NumObjectsReq (..) + , ObjectDiffusion + ) +import Test.QuickCheck +import Test.Tasty +import Test.Tasty.QuickCheck +import Test.Util.Orphans.Arbitrary () +import Test.Util.Orphans.IOLike () + +tests :: TestTree +tests = + testGroup + "ObjectDiffusion.Smoke" + [ testProperty + "ObjectDiffusion smoke test with mock objects" + prop_smoke + ] + +{------------------------------------------------------------------------------- + Provides a way to generate lists composed of objects with no duplicate ids, + with an Arbitrary instance +-------------------------------------------------------------------------------} + +class WithId a idTy | a -> idTy where + getId :: a -> idTy + +newtype ListWithUniqueIds a idTy = ListWithUniqueIds [a] + deriving (Eq, Show, Ord) + +instance (Ord idTy, WithId a idTy, Arbitrary a) => Arbitrary (ListWithUniqueIds a idTy) where + arbitrary = ListWithUniqueIds . nubOrdOn getId <$> arbitrary + +instance WithId SmokeObject SmokeObjectId where getId = getSmokeObjectId + +{------------------------------------------------------------------------------- + Mock objectPools +-------------------------------------------------------------------------------} + +newtype SmokeObjectId = SmokeObjectId Int + deriving (Eq, Ord, Show, NoThunks, Arbitrary) + +newtype SmokeObject = SmokeObject {getSmokeObjectId :: SmokeObjectId} + deriving (Eq, Ord, Show, NoThunks, Arbitrary) + +newtype SmokeObjectPool m = SmokeObjectPool (StrictTVar m [SmokeObject]) + +newObjectPool :: MonadSTM m => [SmokeObject] -> m (SmokeObjectPool m) +newObjectPool initialPoolContent = SmokeObjectPool <$> uncheckedNewTVarM initialPoolContent + +makeObjectPoolReader :: + MonadSTM m => SmokeObjectPool m -> ObjectPoolReader SmokeObjectId SmokeObject Int m +makeObjectPoolReader (SmokeObjectPool poolContentTvar) = + ObjectPoolReader + { oprObjectId = getSmokeObjectId + , oprObjectsAfter = \minTicketNo limit -> do + poolContent <- readTVar poolContentTvar + pure $ + take (fromIntegral limit) $ + drop (minTicketNo + 1) $ + ( (\(ticketNo, smokeObject) -> (ticketNo, getSmokeObjectId smokeObject, pure smokeObject)) + <$> zip [(0 :: Int) ..] poolContent + ) + , oprZeroTicketNo = -1 -- objectPoolObjectIdsAfter uses strict comparison, and first ticketNo is 0. + } + +makeObjectPoolWriter :: + MonadSTM m => SmokeObjectPool m -> ObjectPoolWriter SmokeObjectId SmokeObject m +makeObjectPoolWriter (SmokeObjectPool poolContentTvar) = + ObjectPoolWriter + { opwObjectId = getSmokeObjectId + , opwAddObjects = \objects -> do + atomically $ modifyTVar poolContentTvar (++ objects) + return () + , opwHasObject = do + poolContent <- readTVar poolContentTvar + pure $ \objectId -> any (\obj -> getSmokeObjectId obj == objectId) poolContent + } + +mkMockPoolInterfaces :: + MonadSTM m => + [SmokeObject] -> + m + ( ObjectPoolReader SmokeObjectId SmokeObject Int m + , ObjectPoolWriter SmokeObjectId SmokeObject m + , m [SmokeObject] + ) +mkMockPoolInterfaces objects = do + outboundPool <- newObjectPool objects + inboundPool@(SmokeObjectPool tvar) <- newObjectPool [] + + let outboundPoolReader = makeObjectPoolReader outboundPool + inboundPoolWriter = makeObjectPoolWriter inboundPool + + return (outboundPoolReader, inboundPoolWriter, atomically $ readTVar tvar) + +{------------------------------------------------------------------------------- + Main properties +-------------------------------------------------------------------------------} + +-- Protocol constants + +newtype ProtocolConstants + = ProtocolConstants (NumObjectsOutstanding, NumObjectIdsReq, NumObjectsReq) + deriving Show + +instance Arbitrary ProtocolConstants where + arbitrary = do + maxFifoSize <- choose (5, 20) + maxIdsToReq <- choose (3, maxFifoSize) + maxObjectsToReq <- choose (2, maxIdsToReq) + pure $ + ProtocolConstants + ( NumObjectsOutstanding maxFifoSize + , NumObjectIdsReq maxIdsToReq + , NumObjectsReq maxObjectsToReq + ) + +nodeToNodeVersion :: NodeToNodeVersion +nodeToNodeVersion = NodeToNodeV_14 + +prop_smoke :: ProtocolConstants -> ListWithUniqueIds SmokeObject idTy -> Property +prop_smoke protocolConstants (ListWithUniqueIds objects) = + prop_smoke_object_diffusion + protocolConstants + objects + runOutboundPeer + runInboundPeer + (mkMockPoolInterfaces objects) + where + runOutboundPeer outbound outboundChannel tracer = + runPeer + ((\x -> "Outbound (Server): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + outboundChannel + (objectDiffusionOutboundPeer outbound) + >> pure () + + runInboundPeer inbound inboundChannel tracer = + runPipelinedPeer + ((\x -> "Inbound (Client): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + inboundChannel + (objectDiffusionInboundPeerPipelined inbound) + >> pure () + +--- The core logic of the smoke test is shared between the generic smoke tests for ObjectDiffusion, and the ones specialised to PerasCert/PerasVote diffusion +prop_smoke_object_diffusion :: + ( Eq object + , Show object + , Ord objectId + , NoThunks objectId + , Show objectId + , NoThunks object + , Ord ticketNo + ) => + ProtocolConstants -> + [object] -> + ( forall m. + IOLike m => + ObjectDiffusionOutbound objectId object m () -> + Channel m (AnyMessage (ObjectDiffusion objectId object)) -> + (Tracer m String) -> + m () + ) -> + ( forall m. + IOLike m => + ObjectDiffusionInboundPipelined objectId object m () -> + (Channel m (AnyMessage (ObjectDiffusion objectId object))) -> + (Tracer m String) -> + m () + ) -> + ( forall m. + IOLike m => + m + ( ObjectPoolReader objectId object ticketNo m + , ObjectPoolWriter objectId object m + , m [object] + ) + ) -> + Property +prop_smoke_object_diffusion + (ProtocolConstants (maxFifoSize, maxIdsToReq, maxObjectsToReq)) + objects + runOutboundPeer + runInboundPeer + mkPoolInterfaces = + let + simulationResult = runSimStrictShutdown $ do + let tracer = nullTracer + + traceWith tracer "========== [ Starting ObjectDiffusion smoke test ] ==========" + traceWith tracer (show objects) + + (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent) <- mkPoolInterfaces + controlMessage <- uncheckedNewTVarM Continue + + let + inbound = + objectDiffusionInbound + tracer + ( maxFifoSize + , maxIdsToReq + , maxObjectsToReq + ) + inboundPoolWriter + nodeToNodeVersion + (readTVar controlMessage) + + outbound = + objectDiffusionOutbound + tracer + maxFifoSize + outboundPoolReader + nodeToNodeVersion + + withRegistry $ \reg -> do + (outboundChannel, inboundChannel) <- createConnectedChannels + outboundThread <- + forkLinkedThread reg "ObjectDiffusion Outbound peer thread" $ + runOutboundPeer outbound outboundChannel tracer + inboundThread <- + forkLinkedThread reg "ObjectDiffusion Inbound peer thread" $ + runInboundPeer inbound inboundChannel tracer + controlMessageThread <- forkLinkedThread reg "ObjectDiffusion Control thread" $ do + threadDelay 1000 -- give a head start to the other threads + atomically $ writeTVar controlMessage Terminate + threadDelay 1000 -- wait for the other threads to finish + waitAnyThread [outboundThread, inboundThread, controlMessageThread] + + traceWith tracer "========== [ ObjectDiffusion smoke test finished ] ==========" + poolContent <- getAllInboundPoolContent + + traceWith tracer "inboundPoolContent:" + traceWith tracer (show poolContent) + traceWith tracer "========== ======================================= ==========" + pure poolContent + in + case simulationResult of + Right inboundPoolContent -> inboundPoolContent === objects + Left msg -> counterexample (show msg) $ property False From cf5b096373a5b3d56670a9c68ecd5cbf0a654ed2 Mon Sep 17 00:00:00 2001 From: Thomas BAGREL Date: Fri, 12 Sep 2025 14:55:18 +0200 Subject: [PATCH 4/8] Add definitions and codec for `PerasCert` diffusion through ObjectDiffusion Co-authored-by: Agustin Mista Co-authored-by: Alexander Esgen Co-authored-by: Georgy Lukyanov Co-authored-by: Thomas BAGREL Co-authored-by: Nicolas BACQUEY Co-authored-by: Nicolas "Niols" Jeannerod --- ouroboros-consensus/ouroboros-consensus.cabal | 2 + .../ObjectDiffusion/ObjectPool/PerasCert.hs | 108 ++++++++++++++++++ .../MiniProtocol/ObjectDiffusion/PerasCert.hs | 41 +++++++ .../Ouroboros/Consensus/Node/Serialisation.hs | 40 ++++++- 4 files changed, 188 insertions(+), 3 deletions(-) create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index d5e65f2b72..31cb0c35d8 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -193,7 +193,9 @@ library Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert Ouroboros.Consensus.Node.GsmState Ouroboros.Consensus.Node.InitStorage Ouroboros.Consensus.Node.NetworkProtocolVersion diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs new file mode 100644 index 0000000000..1e9e966341 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs @@ -0,0 +1,108 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE StandaloneDeriving #-} + +-- | Instantiate 'ObjectPoolReader' and 'ObjectPoolWriter' using Peras +-- certificates from the 'PerasCertDB' (or the 'ChainDB' which is wrapping the +-- 'PerasCertDB'). +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert + ( makePerasCertPoolReaderFromCertDB + , makePerasCertPoolWriterFromCertDB + , makePerasCertPoolReaderFromChainDB + , makePerasCertPoolWriterFromChainDB + ) where + +import qualified Data.Map as Map +import GHC.Exception (throw) +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) +import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB +import Ouroboros.Consensus.Storage.PerasCertDB.API + ( PerasCertDB + , PerasCertSnapshot + , PerasCertTicketNo + ) +import qualified Ouroboros.Consensus.Storage.PerasCertDB.API as PerasCertDB +import Ouroboros.Consensus.Util.IOLike + +makePerasCertPoolReaderFromSnapshot :: + (IOLike m, StandardHash blk) => + STM m (PerasCertSnapshot blk) -> + ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m +makePerasCertPoolReaderFromSnapshot getCertSnapshot = + ObjectPoolReader + { oprObjectId = getPerasCertRound + , oprZeroTicketNo = PerasCertDB.zeroPerasCertTicketNo + , oprObjectsAfter = \lastKnown limit -> do + certSnapshot <- getCertSnapshot + pure $ + take (fromIntegral limit) $ + [ (ticketNo, getPerasCertRound cert, pure (getPerasCert cert)) + | (ticketNo, cert) <- + Map.toAscList $ + PerasCertDB.getCertsAfter certSnapshot lastKnown + ] + } + +makePerasCertPoolReaderFromCertDB :: + (IOLike m, StandardHash blk) => + PerasCertDB m blk -> ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m +makePerasCertPoolReaderFromCertDB perasCertDB = + makePerasCertPoolReaderFromSnapshot (PerasCertDB.getCertSnapshot perasCertDB) + +makePerasCertPoolWriterFromCertDB :: + (StandardHash blk, IOLike m) => + PerasCertDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m +makePerasCertPoolWriterFromCertDB perasCertDB = + ObjectPoolWriter + { opwObjectId = getPerasCertRound + , opwAddObjects = \certs -> do + validatePerasCerts certs + >>= mapM_ (PerasCertDB.addCert perasCertDB) + , opwHasObject = do + certSnapshot <- PerasCertDB.getCertSnapshot perasCertDB + pure $ PerasCertDB.containsCert certSnapshot + } + +makePerasCertPoolReaderFromChainDB :: + (IOLike m, StandardHash blk) => + ChainDB m blk -> ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m +makePerasCertPoolReaderFromChainDB chainDB = + makePerasCertPoolReaderFromSnapshot (ChainDB.getPerasCertSnapshot chainDB) + +makePerasCertPoolWriterFromChainDB :: + (StandardHash blk, IOLike m) => + ChainDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m +makePerasCertPoolWriterFromChainDB chainDB = + ObjectPoolWriter + { opwObjectId = getPerasCertRound + , opwAddObjects = \certs -> do + validatePerasCerts certs + >>= mapM_ (ChainDB.addPerasCertAsync chainDB) + , opwHasObject = do + certSnapshot <- ChainDB.getPerasCertSnapshot chainDB + pure $ PerasCertDB.containsCert certSnapshot + } + +data PerasCertInboundException + = forall blk. PerasCertValidationError (PerasValidationErr blk) + +deriving instance Show PerasCertInboundException + +instance Exception PerasCertInboundException + +-- | Validate a list of 'PerasCert's, throwing a 'PerasCertInboundException' if +-- any of them are invalid. +validatePerasCerts :: + (StandardHash blk, MonadThrow m) => + [PerasCert blk] -> + m [ValidatedPerasCert blk] +validatePerasCerts certs = do + let perasCfg = makePerasCfg Nothing + -- TODO replace the mocked-up Nothing with a real + -- 'BlockConfig' when all the plumbing is in place + -- see https://github.com/tweag/cardano-peras/issues/73 + -- see https://github.com/tweag/cardano-peras/issues/120 + case traverse (validatePerasCert perasCfg) certs of + Left validationErr -> throw (PerasCertValidationError validationErr) + Right validatedCerts -> return validatedCerts diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs new file mode 100644 index 0000000000..ba0ba934a2 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs @@ -0,0 +1,41 @@ +-- | This module defines type aliases for the ObjectDiffusion protocol applied +-- to PerasCert diffusion. +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert + ( TracePerasCertDiffusionInbound + , TracePerasCertDiffusionOutbound + , PerasCertPoolReader + , PerasCertPoolWriter + , PerasCertDiffusionInboundPipelined + , PerasCertDiffusionOutbound + , PerasCertDiffusion + ) where + +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound +import Ouroboros.Consensus.Storage.PerasCertDB.API +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound (ObjectDiffusionInboundPipelined) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound (ObjectDiffusionOutbound) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type (ObjectDiffusion) + +type TracePerasCertDiffusionInbound blk = + TraceObjectDiffusionInbound PerasRoundNo (PerasCert blk) + +type TracePerasCertDiffusionOutbound blk = + TraceObjectDiffusionOutbound PerasRoundNo (PerasCert blk) + +type PerasCertPoolReader blk m = + ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m + +type PerasCertPoolWriter blk m = + ObjectPoolWriter PerasRoundNo (PerasCert blk) m + +type PerasCertDiffusionInboundPipelined blk m a = + ObjectDiffusionInboundPipelined PerasRoundNo (PerasCert blk) m a + +type PerasCertDiffusionOutbound blk m a = + ObjectDiffusionOutbound PerasRoundNo (PerasCert blk) m a + +type PerasCertDiffusion blk = + ObjectDiffusion PerasRoundNo (PerasCert blk) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs index 6520aae47c..6a4fc87229 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs @@ -6,8 +6,11 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE StandaloneKindSignatures #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE UndecidableInstances #-} -- | Serialisation for sending things across the network. @@ -33,8 +36,8 @@ module Ouroboros.Consensus.Node.Serialisation , Some (..) ) where -import Codec.CBOR.Decoding (Decoder) -import Codec.CBOR.Encoding (Encoding) +import Codec.CBOR.Decoding (Decoder, decodeListLenOf) +import Codec.CBOR.Encoding (Encoding, encodeListLen) import Codec.Serialise (Serialise (decode, encode)) import Data.Kind import Data.SOP.BasicFunctors @@ -47,7 +50,15 @@ import Ouroboros.Consensus.Ledger.SupportsMempool import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.TypeFamilyWrappers import Ouroboros.Consensus.Util (Some (..)) -import Ouroboros.Network.Block (unwrapCBORinCBOR, wrapCBORinCBOR) +import Ouroboros.Network.Block + ( Tip + , decodePoint + , decodeTip + , encodePoint + , encodeTip + , unwrapCBORinCBOR + , wrapCBORinCBOR + ) {------------------------------------------------------------------------------- NodeToNode @@ -173,6 +184,29 @@ deriving newtype instance SerialiseNodeToNode blk (GenTxId blk) => SerialiseNodeToNode blk (WrapGenTxId blk) +instance ConvertRawHash blk => SerialiseNodeToNode blk (Point blk) where + encodeNodeToNode _ccfg _version = encodePoint $ encodeRawHash (Proxy @blk) + decodeNodeToNode _ccfg _version = decodePoint $ decodeRawHash (Proxy @blk) + +instance ConvertRawHash blk => SerialiseNodeToNode blk (Tip blk) where + encodeNodeToNode _ccfg _version = encodeTip $ encodeRawHash (Proxy @blk) + decodeNodeToNode _ccfg _version = decodeTip $ decodeRawHash (Proxy @blk) + +instance SerialiseNodeToNode blk PerasRoundNo where + encodeNodeToNode _ccfg _version = encode + decodeNodeToNode _ccfg _version = decode +instance ConvertRawHash blk => SerialiseNodeToNode blk (PerasCert blk) where + -- Consistent with the 'Serialise' instance for 'PerasCert' defined in Ouroboros.Consensus.Block.SupportsPeras + encodeNodeToNode ccfg version PerasCert{..} = + encodeListLen 2 + <> encodeNodeToNode ccfg version pcCertRound + <> encodeNodeToNode ccfg version pcCertBoostedBlock + decodeNodeToNode ccfg version = do + decodeListLenOf 2 + pcCertRound <- decodeNodeToNode ccfg version + pcCertBoostedBlock <- decodeNodeToNode ccfg version + pure $ PerasCert pcCertRound pcCertBoostedBlock + deriving newtype instance SerialiseNodeToClient blk (GenTxId blk) => SerialiseNodeToClient blk (WrapGenTxId blk) From 9bed429e38c39904c2231c72cf5145e386d0709d Mon Sep 17 00:00:00 2001 From: Thomas BAGREL Date: Fri, 12 Sep 2025 14:58:12 +0200 Subject: [PATCH 5/8] Add smoke tests for PerasCertDiffusion Co-authored-by: Agustin Mista Co-authored-by: Alexander Esgen Co-authored-by: Georgy Lukyanov Co-authored-by: Thomas BAGREL Co-authored-by: Nicolas BACQUEY Co-authored-by: Nicolas "Niols" Jeannerod --- ouroboros-consensus/ouroboros-consensus.cabal | 1 + .../test/consensus-test/Main.hs | 2 + .../ObjectDiffusion/PerasCert/Smoke.hs | 134 ++++++++++++++++++ 3 files changed, 137 insertions(+) create mode 100644 ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 31cb0c35d8..091aeea1e9 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -669,6 +669,7 @@ test-suite consensus-test Test.Consensus.MiniProtocol.ChainSync.CSJ Test.Consensus.MiniProtocol.ChainSync.Client Test.Consensus.MiniProtocol.LocalStateQuery.Server + Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke Test.Consensus.Peras.WeightSnapshot Test.Consensus.Util.MonadSTM.NormalForm diff --git a/ouroboros-consensus/test/consensus-test/Main.hs b/ouroboros-consensus/test/consensus-test/Main.hs index 439d7b3043..79d681213a 100644 --- a/ouroboros-consensus/test/consensus-test/Main.hs +++ b/ouroboros-consensus/test/consensus-test/Main.hs @@ -16,6 +16,7 @@ import qualified Test.Consensus.MiniProtocol.BlockFetch.Client (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.CSJ (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.Client (tests) import qualified Test.Consensus.MiniProtocol.LocalStateQuery.Server (tests) +import qualified Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke (tests) import qualified Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke (tests) import qualified Test.Consensus.Peras.WeightSnapshot (tests) import qualified Test.Consensus.Util.MonadSTM.NormalForm (tests) @@ -39,6 +40,7 @@ tests = , Test.Consensus.MiniProtocol.ChainSync.CSJ.tests , Test.Consensus.MiniProtocol.ChainSync.Client.tests , Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke.tests + , Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke.tests , Test.Consensus.MiniProtocol.LocalStateQuery.Server.tests , testGroup "Mempool" diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs new file mode 100644 index 0000000000..bfdabbe57a --- /dev/null +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs @@ -0,0 +1,134 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE UndecidableInstances #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke (tests) where + +import Control.Tracer (contramap, nullTracer) +import Data.Functor.Identity (Identity (..)) +import qualified Data.List.NonEmpty as NE +import qualified Data.Map as Map +import Network.TypedProtocol.Driver.Simple (runPeer, runPipelinedPeer) +import Ouroboros.Consensus.Block.SupportsPeras +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert +import Ouroboros.Consensus.Storage.PerasCertDB.API + ( AddPerasCertResult (..) + , PerasCertDB + , PerasCertTicketNo + ) +import qualified Ouroboros.Consensus.Storage.PerasCertDB.API as PerasCertDB +import qualified Ouroboros.Consensus.Storage.PerasCertDB.Impl as PerasCertDB +import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Network.Block (Point (..), SlotNo (SlotNo), StandardHash) +import Ouroboros.Network.Point (Block (Block), WithOrigin (..)) +import Ouroboros.Network.Protocol.ObjectDiffusion.Codec +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + ( objectDiffusionInboundPeerPipelined + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound (objectDiffusionOutboundPeer) +import Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke + ( ListWithUniqueIds (..) + , ProtocolConstants + , WithId + , getId + , prop_smoke_object_diffusion + ) +import Test.QuickCheck +import Test.Tasty +import Test.Tasty.QuickCheck (testProperty) +import Test.Util.TestBlock + +tests :: TestTree +tests = + testGroup + "ObjectDiffusion.PerasCert.Smoke" + [ testProperty "PerasCertDiffusion smoke test" prop_smoke + ] + +instance Arbitrary (Point TestBlock) where + arbitrary = + -- Sometimes pick the genesis point + frequency + [ (1, pure $ Point Origin) + , + ( 4 + , do + slotNo <- SlotNo <$> arbitrary + hash <- TestHash . NE.fromList . getNonEmpty <$> arbitrary + pure $ Point (At (Block slotNo hash)) + ) + ] + +instance Arbitrary (Point blk) => Arbitrary (PerasCert blk) where + arbitrary = do + pcCertRound <- PerasRoundNo <$> arbitrary + pcCertBoostedBlock <- arbitrary + pure $ PerasCert{pcCertRound, pcCertBoostedBlock} + +instance WithId (PerasCert blk) PerasRoundNo where + getId = pcCertRound + +newCertDB :: (IOLike m, StandardHash blk) => [PerasCert blk] -> m (PerasCertDB m blk) +newCertDB certs = do + db <- PerasCertDB.openDB (PerasCertDB.PerasCertDbArgs @Identity nullTracer) + mapM_ + ( \cert -> do + let validatedCert = + ValidatedPerasCert + { vpcCert = cert + , vpcCertBoost = boostPerCert + } + result <- PerasCertDB.addCert db validatedCert + case result of + AddedPerasCertToDB -> pure () + PerasCertAlreadyInDB -> throwIO (userError "Expected AddedPerasCertToDB, but cert was already in DB") + ) + certs + pure db + +prop_smoke :: ProtocolConstants -> ListWithUniqueIds (PerasCert TestBlock) PerasRoundNo -> Property +prop_smoke protocolConstants (ListWithUniqueIds certs) = + prop_smoke_object_diffusion protocolConstants certs runOutboundPeer runInboundPeer mkPoolInterfaces + where + runOutboundPeer outbound outboundChannel tracer = + runPeer + ((\x -> "Outbound (Client): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + outboundChannel + (objectDiffusionOutboundPeer outbound) + >> pure () + runInboundPeer inbound inboundChannel tracer = + runPipelinedPeer + ((\x -> "Inbound (Server): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + inboundChannel + (objectDiffusionInboundPeerPipelined inbound) + >> pure () + mkPoolInterfaces :: + forall m. + IOLike m => + m + ( ObjectPoolReader PerasRoundNo (PerasCert TestBlock) PerasCertTicketNo m + , ObjectPoolWriter PerasRoundNo (PerasCert TestBlock) m + , m [PerasCert TestBlock] + ) + mkPoolInterfaces = do + outboundPool <- newCertDB certs + inboundPool <- newCertDB [] + + let outboundPoolReader = makePerasCertPoolReaderFromCertDB outboundPool + inboundPoolWriter = makePerasCertPoolWriterFromCertDB inboundPool + getAllInboundPoolContent = do + snap <- atomically $ PerasCertDB.getCertSnapshot inboundPool + let rawContent = + Map.toAscList $ + PerasCertDB.getCertsAfter snap (PerasCertDB.zeroPerasCertTicketNo) + pure $ getPerasCert . snd <$> rawContent + + return (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent) From e0d3f6b28afbd66fb2910d7939bb9907052b0cf2 Mon Sep 17 00:00:00 2001 From: Thomas BAGREL Date: Fri, 12 Sep 2025 15:07:26 +0200 Subject: [PATCH 6/8] Register and wire-in PerasCertDiffusion in the network layer Co-authored-by: Agustin Mista Co-authored-by: Alexander Esgen Co-authored-by: Georgy Lukyanov Co-authored-by: Thomas BAGREL Co-authored-by: Nicolas BACQUEY Co-authored-by: Nicolas "Niols" Jeannerod --- .../Ouroboros/Consensus/Network/NodeToNode.hs | 176 +++++++++++++++--- .../Ouroboros/Consensus/Node.hs | 2 + .../Ouroboros/Consensus/Node/Tracers.hs | 11 ++ .../Test/ThreadNet/Network.hs | 6 + .../test/mock-test/Test/ThreadNet/BFT.hs | 1 + 5 files changed, 166 insertions(+), 30 deletions(-) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index bc66cf78d6..294aace61f 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -54,6 +54,7 @@ import qualified Data.ByteString.Lazy as BSL import Data.Hashable (Hashable) import Data.Int (Int64) import Data.Map.Strict (Map) +import qualified Data.Set as Set import Data.Void (Void) import qualified Network.Mux as Mux import Network.TypedProtocol.Codec @@ -68,6 +69,10 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client ) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient import Ouroboros.Consensus.MiniProtocol.ChainSync.Server +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert import Ouroboros.Consensus.Node.ExitPolicy import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Node.Run @@ -81,10 +86,6 @@ import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.Orphans () import Ouroboros.Network.Block ( Serialised (..) - , decodePoint - , decodeTip - , encodePoint - , encodeTip ) import Ouroboros.Network.BlockFetch import Ouroboros.Network.BlockFetch.Client @@ -124,6 +125,18 @@ import Ouroboros.Network.Protocol.KeepAlive.Client import Ouroboros.Network.Protocol.KeepAlive.Codec import Ouroboros.Network.Protocol.KeepAlive.Server import Ouroboros.Network.Protocol.KeepAlive.Type +import Ouroboros.Network.Protocol.ObjectDiffusion.Codec + ( byteLimitsObjectDiffusion + , codecObjectDiffusion + , codecObjectDiffusionId + , timeLimitsObjectDiffusion + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + ( objectDiffusionInboundPeerPipelined + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound + ( objectDiffusionOutboundPeer + ) import Ouroboros.Network.Protocol.PeerSharing.Client ( PeerSharingClient , peerSharingClientPeer @@ -197,6 +210,15 @@ data Handlers m addr blk = Handlers NodeToNodeVersion -> ConnectionId addr -> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m () + , hPerasCertDiffusionClient :: + NodeToNodeVersion -> + ControlMessageSTM m -> + ConnectionId addr -> + PerasCertDiffusionInboundPipelined blk m () + , hPerasCertDiffusionServer :: + NodeToNodeVersion -> + ConnectionId addr -> + PerasCertDiffusionOutbound blk m () , hKeepAliveClient :: NodeToNodeVersion -> ControlMessageSTM m -> @@ -293,6 +315,22 @@ mkHandlers (mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool) (getMempoolWriter getMempool) version + , hPerasCertDiffusionClient = \version controlMessageSTM peer -> + objectDiffusionInbound + (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers)) + ( perasCertDiffusionMaxFifoLength miniProtocolParameters + , 10 -- TODO: see https://github.com/tweag/cardano-peras/issues/97 + , 10 -- TODO: see https://github.com/tweag/cardano-peras/issues/97 + ) + (makePerasCertPoolWriterFromChainDB $ getChainDB) + version + controlMessageSTM + , hPerasCertDiffusionServer = \version peer -> + objectDiffusionOutbound + (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers)) + (perasCertDiffusionMaxFifoLength miniProtocolParameters) + (makePerasCertPoolReaderFromChainDB $ getChainDB) + version , hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng , hKeepAliveServer = \_version _peer -> keepAliveServer , hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM @@ -304,7 +342,7 @@ mkHandlers -------------------------------------------------------------------------------} -- | Node-to-node protocol codecs needed to run 'Handlers'. -data Codecs blk addr e m bCS bSCS bBF bSBF bTX bKA bPS = Codecs +data Codecs blk addr e m bCS bSCS bBF bSBF bTX bPCD bKA bPS = Codecs { cChainSyncCodec :: Codec (ChainSync (Header blk) (Point blk) (Tip blk)) e m bCS , cChainSyncCodecSerialised :: Codec (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)) e m bSCS @@ -312,6 +350,7 @@ data Codecs blk addr e m bCS bSCS bBF bSBF bTX bKA bPS = Codecs , cBlockFetchCodecSerialised :: Codec (BlockFetch (Serialised blk) (Point blk)) e m bSBF , cTxSubmission2Codec :: Codec (TxSubmission2 (GenTxId blk) (GenTx blk)) e m bTX + , cPerasCertDiffusionCodec :: Codec (PerasCertDiffusion blk) e m bPCD , cKeepAliveCodec :: Codec KeepAlive e m bKA , cPeerSharingCodec :: Codec (PeerSharing addr) e m bPS } @@ -339,49 +378,53 @@ defaultCodecs :: ByteString ByteString ByteString + ByteString defaultCodecs ccfg version encAddr decAddr nodeToNodeVersion = Codecs { cChainSyncCodec = codecChainSync enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) - (encodeTip (encodeRawHash p)) - (decodeTip (decodeRawHash p)) + enc + dec + enc + dec , cChainSyncCodecSerialised = codecChainSync enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) - (encodeTip (encodeRawHash p)) - (decodeTip (decodeRawHash p)) + enc + dec + enc + dec , cBlockFetchCodec = codecBlockFetch enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) + enc + dec , cBlockFetchCodecSerialised = codecBlockFetch enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) + enc + dec , cTxSubmission2Codec = codecTxSubmission2 enc dec enc dec + , cPerasCertDiffusionCodec = + codecObjectDiffusion + enc + dec + enc + dec , cKeepAliveCodec = codecKeepAlive_v2 , cPeerSharingCodec = codecPeerSharing (encAddr nodeToNodeVersion) (decAddr nodeToNodeVersion) } where - p :: Proxy blk - p = Proxy - enc :: SerialiseNodeToNode blk a => a -> Encoding enc = encodeNodeToNode ccfg version @@ -401,6 +444,7 @@ identityCodecs :: (AnyMessage (BlockFetch blk (Point blk))) (AnyMessage (BlockFetch (Serialised blk) (Point blk))) (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) + (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing addr)) identityCodecs = @@ -410,6 +454,7 @@ identityCodecs = , cBlockFetchCodec = codecBlockFetchId , cBlockFetchCodecSerialised = codecBlockFetchId , cTxSubmission2Codec = codecTxSubmission2Id + , cPerasCertDiffusionCodec = codecObjectDiffusionId , cKeepAliveCodec = codecKeepAliveId , cPeerSharingCodec = codecPeerSharingId } @@ -432,6 +477,7 @@ data Tracers' peer ntnAddr blk e f = Tracers f (TraceLabelPeer peer (TraceSendRecv (BlockFetch (Serialised blk) (Point blk)))) , tTxSubmission2Tracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission2 (GenTxId blk) (GenTx blk)))) + , tPerasCertDiffusionTracer :: f (TraceLabelPeer peer (TraceSendRecv (PerasCertDiffusion blk))) , tKeepAliveTracer :: f (TraceLabelPeer peer (TraceSendRecv KeepAlive)) , tPeerSharingTracer :: f (TraceLabelPeer peer (TraceSendRecv (PeerSharing ntnAddr))) } @@ -444,6 +490,7 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer ntnAddr blk e f , tBlockFetchTracer = f tBlockFetchTracer , tBlockFetchSerialisedTracer = f tBlockFetchSerialisedTracer , tTxSubmission2Tracer = f tTxSubmission2Tracer + , tPerasCertDiffusionTracer = f tPerasCertDiffusionTracer , tKeepAliveTracer = f tKeepAliveTracer , tPeerSharingTracer = f tPeerSharingTracer } @@ -464,6 +511,7 @@ nullTracers = , tBlockFetchTracer = nullTracer , tBlockFetchSerialisedTracer = nullTracer , tTxSubmission2Tracer = nullTracer + , tPerasCertDiffusionTracer = nullTracer , tKeepAliveTracer = nullTracer , tPeerSharingTracer = nullTracer } @@ -485,6 +533,7 @@ showTracers tr = , tBlockFetchTracer = showTracing tr , tBlockFetchSerialisedTracer = showTracing tr , tTxSubmission2Tracer = showTracing tr + , tPerasCertDiffusionTracer = showTracing tr , tKeepAliveTracer = showTracing tr , tPeerSharingTracer = showTracing tr } @@ -509,7 +558,7 @@ type ServerApp m addr bytes a = -- | Applications for the node-to-node protocols -- -- See 'Network.Mux.Types.MuxApplication' -data Apps m addr bCS bBF bTX bKA bPS a b = Apps +data Apps m addr bCS bBF bTX bPCD bKA bPS a b = Apps { aChainSyncClient :: ClientApp m addr bCS a -- ^ Start a chain sync client that communicates with the given upstream -- node. @@ -525,6 +574,10 @@ data Apps m addr bCS bBF bTX bKA bPS a b = Apps -- given upstream node. , aTxSubmission2Server :: ServerApp m addr bTX b -- ^ Start a transaction submission v2 server. + , aPerasCertDiffusionClient :: ClientApp m addr bPCD a + -- ^ Start a Peras cert diffusion client. + , aPerasCertDiffusionServer :: ServerApp m addr bPCD b + -- ^ Start a Peras cert diffusion server. , aKeepAliveClient :: ClientApp m addr bKA a -- ^ Start a keep-alive client. , aKeepAliveServer :: ServerApp m addr bKA b @@ -540,7 +593,7 @@ data Apps m addr bCS bBF bTX bKA bPS a b = Apps -- -- They don't depend on the instantiation of the protocol parameters (which -- block type is used, etc.), hence the use of 'RankNTypes'. -data ByteLimits bCS bBF bTX bKA bPS = ByteLimits +data ByteLimits bCS bBF bTX bPCD bKA bPS = ByteLimits { blChainSync :: forall header point tip. ProtocolSizeLimits @@ -556,6 +609,11 @@ data ByteLimits bCS bBF bTX bKA bPS = ByteLimits ProtocolSizeLimits (TxSubmission2 txid tx) bTX + , blPerasCertDiffusion :: + forall blk. + ProtocolSizeLimits + (PerasCertDiffusion blk) + bPCD , blKeepAlive :: ProtocolSizeLimits KeepAlive @@ -567,22 +625,24 @@ data ByteLimits bCS bBF bTX bKA bPS = ByteLimits bPS } -noByteLimits :: ByteLimits bCS bBF bTX bKA bPS +noByteLimits :: ByteLimits bCS bBF bTX bPCD bKA bPS noByteLimits = ByteLimits { blChainSync = byteLimitsChainSync (const 0) , blBlockFetch = byteLimitsBlockFetch (const 0) , blTxSubmission2 = byteLimitsTxSubmission2 (const 0) + , blPerasCertDiffusion = byteLimitsObjectDiffusion (const 0) , blKeepAlive = byteLimitsKeepAlive (const 0) , blPeerSharing = byteLimitsPeerSharing (const 0) } -byteLimits :: ByteLimits ByteString ByteString ByteString ByteString ByteString +byteLimits :: ByteLimits ByteString ByteString ByteString ByteString ByteString ByteString byteLimits = ByteLimits { blChainSync = byteLimitsChainSync size , blBlockFetch = byteLimitsBlockFetch size , blTxSubmission2 = byteLimitsTxSubmission2 size + , blPerasCertDiffusion = byteLimitsObjectDiffusion size , blKeepAlive = byteLimitsKeepAlive size , blPeerSharing = byteLimitsPeerSharing size } @@ -594,7 +654,7 @@ byteLimits = -- | Construct the 'NetworkApplication' for the node-to-node protocols mkApps :: - forall m addrNTN addrNTC blk e bCS bBF bTX bKA bPS. + forall m addrNTN addrNTC blk e bCS bBF bTX bPCD bKA bPS. ( IOLike m , MonadTimer m , Ord addrNTN @@ -609,8 +669,8 @@ mkApps :: NodeKernel m addrNTN addrNTC blk -> StdGen -> Tracers m addrNTN blk e -> - (NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bKA bPS) -> - ByteLimits bCS bBF bTX bKA bPS -> + (NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bPCD bKA bPS) -> + ByteLimits bCS bBF bTX bPCD bKA bPS -> -- Chain-Sync timeouts for chain-sync client (using `Header blk`) as well as -- the server (`SerialisedHeader blk`). (forall header. ProtocolTimeLimitsWithRnd (ChainSync header (Point blk) (Tip blk))) -> @@ -618,7 +678,7 @@ mkApps :: CsClient.CSJConfig -> ReportPeerMetrics m (ConnectionId addrNTN) -> Handlers m addrNTN blk -> - Apps m addrNTN bCS bBF bTX bKA bPS NodeToNodeInitiatorResult () + Apps m addrNTN bCS bBF bTX bPCD bKA bPS NodeToNodeInitiatorResult () mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucketConfig csjConfig ReportPeerMetrics{..} Handlers{..} = Apps{..} where @@ -797,6 +857,51 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke channel (txSubmissionServerPeerPipelined (hTxSubmissionServer version them)) + aPerasCertDiffusionClient :: + NodeToNodeVersion -> + ExpandedInitiatorContext addrNTN m -> + Channel m bPCD -> + m (NodeToNodeInitiatorResult, Maybe bPCD) + aPerasCertDiffusionClient + version + ExpandedInitiatorContext + { eicConnectionId = them + , eicControlMessage = controlMessageSTM + } + channel = do + labelThisThread "PerasCertDiffusionClient" + ((), trailing) <- + runPipelinedPeerWithLimits + (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) + (cPerasCertDiffusionCodec (mkCodecs version)) + blPerasCertDiffusion + timeLimitsObjectDiffusion + channel + ( objectDiffusionInboundPeerPipelined + (hPerasCertDiffusionClient version controlMessageSTM them) + ) + return (NoInitiatorResult, trailing) + + aPerasCertDiffusionServer :: + NodeToNodeVersion -> + ResponderContext addrNTN -> + Channel m bPCD -> + m ((), Maybe bPCD) + aPerasCertDiffusionServer + version + ResponderContext{rcConnectionId = them} + channel = do + labelThisThread "PerasCertDiffusionServer" + runPeerWithLimits + (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) + (cPerasCertDiffusionCodec (mkCodecs version)) + blPerasCertDiffusion + timeLimitsObjectDiffusion + channel + ( objectDiffusionOutboundPeer + (hPerasCertDiffusionServer version them) + ) + aKeepAliveClient :: NodeToNodeVersion -> ExpandedInitiatorContext addrNTN m -> @@ -900,10 +1005,11 @@ initiator :: MiniProtocolParameters -> NodeToNodeVersion -> NodeToNodeVersionData -> - Apps m addr b b b b b a c -> + Apps m addr b b b b b b a c -> OuroborosBundleWithExpandedCtx 'Mux.InitiatorMode addr b m a Void initiator miniProtocolParameters version versionData Apps{..} = nodeToNodeProtocols + Set.empty -- TODO: change for a meaningful value miniProtocolParameters -- TODO: currently consensus is using 'ConnectionId' for its 'peer' type. -- This is currently ok, as we might accept multiple connections from the @@ -918,6 +1024,9 @@ initiator miniProtocolParameters version versionData Apps{..} = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aBlockFetchClient version ctx))) , txSubmissionProtocol = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aTxSubmission2Client version ctx))) + , perasCertDiffusionProtocol = + (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aPerasCertDiffusionClient version ctx))) + , perasVoteDiffusionProtocol = error "perasVoteDiffusionProtocol not implemented" , keepAliveProtocol = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aKeepAliveClient version ctx))) , peerSharingProtocol = @@ -936,10 +1045,11 @@ initiatorAndResponder :: MiniProtocolParameters -> NodeToNodeVersion -> NodeToNodeVersionData -> - Apps m addr b b b b b a c -> + Apps m addr b b b b b b a c -> OuroborosBundleWithExpandedCtx 'Mux.InitiatorResponderMode addr b m a c initiatorAndResponder miniProtocolParameters version versionData Apps{..} = nodeToNodeProtocols + Set.empty -- TODO: change for a meaningful value miniProtocolParameters ( NodeToNodeProtocols { chainSyncProtocol = @@ -957,6 +1067,12 @@ initiatorAndResponder miniProtocolParameters version versionData Apps{..} = (MiniProtocolCb (\initiatorCtx -> aTxSubmission2Client version initiatorCtx)) (MiniProtocolCb (\responderCtx -> aTxSubmission2Server version responderCtx)) ) + , perasCertDiffusionProtocol = + ( InitiatorAndResponderProtocol + (MiniProtocolCb (\initiatorCtx -> aPerasCertDiffusionClient version initiatorCtx)) + (MiniProtocolCb (\responderCtx -> aPerasCertDiffusionServer version responderCtx)) + ) + , perasVoteDiffusionProtocol = error "perasVoteDiffusionProtocol not implemented" , keepAliveProtocol = ( InitiatorAndResponderProtocol (MiniProtocolCb (\initiatorCtx -> aKeepAliveClient version initiatorCtx)) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index efd7018046..aa51e178da 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -649,6 +649,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = ByteString ByteString ByteString + ByteString NodeToNodeInitiatorResult () mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics encAddrNTN decAddrNTN version = @@ -690,6 +691,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = ByteString ByteString ByteString + ByteString NodeToNodeInitiatorResult () ) -> diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs index 3d025ea91d..4b8627dbd9 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs @@ -42,6 +42,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Server import Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server ( TraceLocalTxSubmissionServerEvent (..) ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert import Ouroboros.Consensus.Node.GSM (TraceGsmEvent) import Ouroboros.Consensus.Protocol.Praos.AgentClient ( KESAgentClientTrace (..) @@ -79,6 +80,10 @@ data Tracers' remotePeer localPeer blk f = Tracers f (TraceLabelPeer remotePeer (TraceTxSubmissionOutbound (GenTxId blk) (GenTx blk))) , localTxSubmissionServerTracer :: f (TraceLocalTxSubmissionServerEvent blk) , mempoolTracer :: f (TraceEventMempool blk) + , perasCertDiffusionInboundTracer :: + f (TraceLabelPeer remotePeer (TracePerasCertDiffusionInbound blk)) + , perasCertDiffusionOutboundTracer :: + f (TraceLabelPeer remotePeer (TracePerasCertDiffusionOutbound blk)) , forgeTracer :: f (TraceLabelCreds (TraceForgeEvent blk)) , blockchainTimeTracer :: f (TraceBlockchainTimeEvent UTCTime) , forgeStateInfoTracer :: f (TraceLabelCreds (ForgeStateInfo blk)) @@ -109,6 +114,8 @@ instance , txOutboundTracer = f txOutboundTracer , localTxSubmissionServerTracer = f localTxSubmissionServerTracer , mempoolTracer = f mempoolTracer + , perasCertDiffusionInboundTracer = f perasCertDiffusionInboundTracer + , perasCertDiffusionOutboundTracer = f perasCertDiffusionOutboundTracer , forgeTracer = f forgeTracer , blockchainTimeTracer = f blockchainTimeTracer , forgeStateInfoTracer = f forgeStateInfoTracer @@ -146,6 +153,8 @@ nullTracers = , txOutboundTracer = nullTracer , localTxSubmissionServerTracer = nullTracer , mempoolTracer = nullTracer + , perasCertDiffusionInboundTracer = nullTracer + , perasCertDiffusionOutboundTracer = nullTracer , forgeTracer = nullTracer , blockchainTimeTracer = nullTracer , forgeStateInfoTracer = nullTracer @@ -185,6 +194,8 @@ showTracers tr = , txOutboundTracer = showTracing tr , localTxSubmissionServerTracer = showTracing tr , mempoolTracer = showTracing tr + , perasCertDiffusionInboundTracer = showTracing tr + , perasCertDiffusionOutboundTracer = showTracing tr , forgeTracer = showTracing tr , blockchainTimeTracer = showTracing tr , forgeStateInfoTracer = showTracing tr diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 847659c2ad..44f13dbfe8 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -83,6 +83,7 @@ import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck as HistoricityCheck import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert (PerasCertDiffusion) import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.ExitPolicy import qualified Ouroboros.Consensus.Node.GSM as GSM @@ -1182,6 +1183,7 @@ runThreadNetwork Lazy.ByteString Lazy.ByteString (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) + (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing NodeId)) customNodeToNodeCodecs cfg ntnVersion = @@ -1201,6 +1203,9 @@ runThreadNetwork , cTxSubmission2Codec = mapFailureCodec CodecIdFailure $ NTN.cTxSubmission2Codec NTN.identityCodecs + , cPerasCertDiffusionCodec = + mapFailureCodec CodecIdFailure $ + NTN.cPerasCertDiffusionCodec NTN.identityCodecs , cKeepAliveCodec = mapFailureCodec CodecIdFailure $ NTN.cKeepAliveCodec NTN.identityCodecs @@ -1791,6 +1796,7 @@ type LimitedApp' m addr blk = Lazy.ByteString Lazy.ByteString (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) + (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing addr)) NodeToNodeInitiatorResult diff --git a/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs b/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs index a455689110..34d0c567a9 100644 --- a/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs +++ b/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs @@ -103,6 +103,7 @@ prop_simple_bft_convergence , version = newestVersion (Proxy @MockBftBlock) } + testOutput :: TestOutput MockBftBlock testOutput = runTestNetwork testConfig From 59519d9a3c4d0ccb04e1eb3dcd06dfff1fc8fb30 Mon Sep 17 00:00:00 2001 From: Agustin Mista Date: Thu, 9 Oct 2025 14:30:15 +0200 Subject: [PATCH 7/8] Propagate feature flags down to NodeKernelArgs Brings in cardano-base and propagates a set of `CardanoFeatureFlag`s from the top-level `RunNodeArgs` down to the `NodeKernelArgs`. This is currently needed by an upcoming PR to the GSM to distinguish whether having an established PerasCertDiffusion connection with a given peer is necessary or not when trying to decide if such peer is idling. --- cabal.project | 2 +- flake.lock | 6 +++--- .../ouroboros-consensus-diffusion.cabal | 1 + .../Ouroboros/Consensus/Node.hs | 13 +++++++++++++ .../Ouroboros/Consensus/NodeKernel.hs | 3 +++ .../Test/ThreadNet/Network.hs | 1 + 6 files changed, 22 insertions(+), 4 deletions(-) diff --git a/cabal.project b/cabal.project index d468545b24..0072c1fba0 100644 --- a/cabal.project +++ b/cabal.project @@ -16,7 +16,7 @@ index-state: -- Bump this if you need newer packages from Hackage , hackage.haskell.org 2025-09-26T20:57:57Z -- Bump this if you need newer packages from CHaP - , cardano-haskell-packages 2025-10-01T14:54:25Z + , cardano-haskell-packages 2025-10-07T11:20:00Z packages: ouroboros-consensus diff --git a/flake.lock b/flake.lock index acef080678..4feaa39476 100644 --- a/flake.lock +++ b/flake.lock @@ -3,11 +3,11 @@ "CHaP": { "flake": false, "locked": { - "lastModified": 1759339316, - "narHash": "sha256-SW/K9yfhNLNCDAl2ZC8ol0w8X+AwyLin0XOvnn50468=", + "lastModified": 1759837865, + "narHash": "sha256-g8SMcVN1v51Muz6a+xJkB92mPx1jsg+sjHKvQ3Wj/jY=", "owner": "intersectmbo", "repo": "cardano-haskell-packages", - "rev": "aa50d6dffede91c8fdfcef94c71641a00214522a", + "rev": "9a46cacd941c108492cd4cee5d29735e8cd8ee65", "type": "github" }, "original": { diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index f1c5b42fcd..0caddc634b 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -77,6 +77,7 @@ library build-depends: base >=4.14 && <4.22, bytestring >=0.10 && <0.13, + cardano-base, cardano-slotting, cborg ^>=0.2.2, containers >=0.5 && <0.8, diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index aa51e178da..b5036ec50e 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -60,6 +60,7 @@ module Ouroboros.Consensus.Node , openChainDB ) where +import Cardano.Base.FeatureFlags (CardanoFeatureFlag) import qualified Cardano.Network.Diffusion as Cardano.Diffusion import Cardano.Network.Diffusion.Configuration (ChainSyncIdleTimeout (..)) import qualified Cardano.Network.Diffusion.Policies as Cardano.Diffusion @@ -84,6 +85,7 @@ import Data.Kind (Type) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isNothing) +import Data.Set (Set) import Data.Time (NominalDiffTime) import Data.Typeable (Typeable) import Ouroboros.Consensus.Block @@ -232,6 +234,8 @@ data RunNodeArgs m addrNTN addrNTC blk = RunNodeArgs -- ^ Network PeerSharing miniprotocol willingness flag , rnGetUseBootstrapPeers :: STM m UseBootstrapPeers , rnGenesisConfig :: GenesisConfig + , rnFeatureFlags :: Set CardanoFeatureFlag + -- ^ Enabled experimental features } -- | Arguments that usually only tests /directly/ specify. @@ -319,6 +323,8 @@ data LowLevelRunNodeArgs m addrNTN addrNTC blk , llrnPublicPeerSelectionStateVar :: StrictSTM.StrictTVar m (PublicPeerSelectionState addrNTN) , llrnLdbFlavorArgs :: LedgerDbBackendArgs m blk -- ^ The flavor arguments + , llrnFeatureFlags :: Set CardanoFeatureFlag + -- ^ Enabled experimental features } data NodeDatabasePaths @@ -570,6 +576,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = gsmAntiThunderingHerd keepAliveRng cfg + llrnFeatureFlags rnTraceConsensus btime (InFutureCheck.realHeaderInFutureCheck llrnMaxClockSkew systemTime) @@ -847,6 +854,7 @@ mkNodeKernelArgs :: StdGen -> StdGen -> TopLevelConfig blk -> + Set CardanoFeatureFlag -> Tracers m (ConnectionId addrNTN) (ConnectionId addrNTC) blk -> BlockchainTime m -> InFutureCheck.SomeHeaderInFutureCheck m blk -> @@ -866,6 +874,7 @@ mkNodeKernelArgs gsmAntiThunderingHerd rng cfg + featureFlags tracers btime chainSyncFutureCheck @@ -885,6 +894,7 @@ mkNodeKernelArgs { tracers , registry , cfg + , featureFlags , btime , chainDB , initChainDB = nodeInitChainDB @@ -1003,6 +1013,7 @@ stdLowLevelRunNodeArgsIO { rnProtocolInfo , rnPeerSharing , rnGenesisConfig + , rnFeatureFlags } $(SafeWildCards.fields 'StdRunNodeArgs) = do llrnBfcSalt <- stdBfcSaltIO @@ -1052,6 +1063,8 @@ stdLowLevelRunNodeArgsIO , llrnPublicPeerSelectionStateVar = Diffusion.dcPublicPeerSelectionVar srnDiffusionConfiguration , llrnLdbFlavorArgs = srnLedgerDbBackendArgs + , llrnFeatureFlags = + rnFeatureFlags } where networkMagic :: NetworkMagic diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index 1c45c68155..36ba2cbcf2 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -27,6 +27,7 @@ module Ouroboros.Consensus.NodeKernel , toConsensusMode ) where +import Cardano.Base.FeatureFlags (CardanoFeatureFlag) import Cardano.Network.ConsensusMode (ConsensusMode (..)) import Cardano.Network.PeerSelection.Bootstrap (UseBootstrapPeers) import Cardano.Network.PeerSelection.LocalRootPeers @@ -51,6 +52,7 @@ import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as NE import Data.Maybe (isJust, mapMaybe) import Data.Proxy +import Data.Set (Set) import qualified Data.Text as Text import Data.Void (Void) import Ouroboros.Consensus.Block hiding (blockMatchesHeader) @@ -195,6 +197,7 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs { tracers :: Tracers m (ConnectionId addrNTN) addrNTC blk , registry :: ResourceRegistry m , cfg :: TopLevelConfig blk + , featureFlags :: Set CardanoFeatureFlag , btime :: BlockchainTime m , chainDB :: ChainDB m blk , initChainDB :: StorageConfig blk -> InitChainDB m blk -> m () diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 44f13dbfe8..6bd6d9bde4 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -1045,6 +1045,7 @@ runThreadNetwork { tracers , registry , cfg = pInfoConfig + , featureFlags = mempty , btime , chainDB , initChainDB = nodeInitChainDB From bef68f56cd65070e02b85915808744d0c8a1d486 Mon Sep 17 00:00:00 2001 From: Thomas BAGREL Date: Thu, 18 Sep 2025 10:51:35 +0200 Subject: [PATCH 8/8] Add changelog entry Co-authored-by: Agustin Mista Co-authored-by: Alexander Esgen Co-authored-by: Georgy Lukyanov Co-authored-by: Thomas BAGREL Co-authored-by: Nicolas BACQUEY Co-authored-by: Nicolas "Niols" Jeannerod --- ...9_095938_thomas.bagrel_object_diffusion.md | 23 +++++++++++++++ ...9_095930_thomas.bagrel_object_diffusion.md | 28 +++++++++++++++++++ ...8_104810_thomas.bagrel_object_diffusion.md | 28 +++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md create mode 100644 ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md create mode 100644 ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md diff --git a/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md b/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md new file mode 100644 index 0000000000..9efe00d438 --- /dev/null +++ b/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md @@ -0,0 +1,23 @@ + + + + + +### Breaking + +- Added support for `NodeToNodeV_16` diff --git a/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md b/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md new file mode 100644 index 0000000000..fc67d834c1 --- /dev/null +++ b/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md @@ -0,0 +1,28 @@ + + + + + +### Breaking + +- Modify `Ouroboros.Consensus{.Node,.Node.Tracer,.Network.NodeToNode}` to wire-in PerasCertDiffusion similarly to other mini-protocols (e.g. TX-submission) +- Propagate feature flags down to `NodeKernelArgs` + +### Non-Breaking + +- Update `Test.ThreadNet.Network` in `unstable-diffusion-testlib` accordingly to the changes made in `Ouroboros.Consensus.Network.NodeToNode` diff --git a/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md b/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md new file mode 100644 index 0000000000..fba0bcc16c --- /dev/null +++ b/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md @@ -0,0 +1,28 @@ + + + + + +### Breaking + +- Relies on a new version of `ouroboros-network` with support for ObjectDiffusion mini-protocol +- Added modules `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion{.Inbound,.Outbound}` with implementations of the ObjectDiffusion protocol (quite similar/inspired from TX-submission, except that client = inbound, server = outbound) +- Added module `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API` defining `ObjectPool{Reader,Writer}` interfaces, through which ObjectDiffusion accesses/stores the objects to send/that have been received. +- Added modules `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert` and `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert` containing definitions specific to `PerasCert` diffusion through the ObjectDiffusion mini-protocol +- Modifies `Ouroboros.Consensus.Node.Serialisation` to add CBOR serialisation (`SerialiseNodeToNode`) for `Point blk`, `Tip blk`, and `PerasCert blk` +- Added modules `Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke` and `Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke` with smoke tests for the general ObjectDiffusion mini-protocol and for the `PerasCert`-specific instance of it