diff --git a/cabal.project b/cabal.project index 28096e3a171..1b2421f5a76 100644 --- a/cabal.project +++ b/cabal.project @@ -18,7 +18,7 @@ index-state: , hackage.haskell.org 2025-08-05T15:28:56Z -- Bump this if you need newer packages from CHaP - , cardano-haskell-packages 2025-03-18T17:41:11Z + , cardano-haskell-packages 2025-09-11T16:20:37Z packages: ./cardano-ping ./monoidal-synchronisation diff --git a/flake.lock b/flake.lock index 617f1807e9e..ad68494d760 100644 --- a/flake.lock +++ b/flake.lock @@ -3,11 +3,11 @@ "CHaP": { "flake": false, "locked": { - "lastModified": 1757088431, - "narHash": "sha256-yUv1JB7WOjoVWhEfk8cKap1P9QDn4hLd4ZHdkNoqvuY=", + "lastModified": 1757610108, + "narHash": "sha256-dL7bPSjuG9e4umn2SMXOgCWVbXirC9duhPWmsyAlj50=", "owner": "intersectmbo", "repo": "cardano-haskell-packages", - "rev": "8e043cb654d69e62bfb59b80afb2ddda8481f6f7", + "rev": "97e199b88b3a0f6b9e404bb4994d4e255240a127", "type": "github" }, "original": { diff --git a/ouroboros-network-api/changelog.d/20250929_093633_alexander.esgen_peras_diffusion.md b/ouroboros-network-api/changelog.d/20250929_093633_alexander.esgen_peras_diffusion.md new file mode 100644 index 00000000000..27de1177573 --- /dev/null +++ b/ouroboros-network-api/changelog.d/20250929_093633_alexander.esgen_peras_diffusion.md @@ -0,0 +1,7 @@ +### Breaking + +- Added `NodeToNodeV_16`. + +### Non-Breaking + +- Added `isPerasEnabled` predicate. diff --git a/ouroboros-network-api/ouroboros-network-api.cabal b/ouroboros-network-api/ouroboros-network-api.cabal index 6ec413aad56..f85ce9a4f82 100644 --- a/ouroboros-network-api/ouroboros-network-api.cabal +++ b/ouroboros-network-api/ouroboros-network-api.cabal @@ -59,6 +59,7 @@ library base >=4.14 && <4.22, base16-bytestring, bytestring >=0.10 && <0.13, + cardano-base, cardano-binary, cardano-slotting, cardano-strict-containers, diff --git a/ouroboros-network-api/src/Ouroboros/Network/NodeToNode/Version.hs b/ouroboros-network-api/src/Ouroboros/Network/NodeToNode/Version.hs index cfac201106d..684865c40fe 100644 --- a/ouroboros-network-api/src/Ouroboros/Network/NodeToNode/Version.hs +++ b/ouroboros-network-api/src/Ouroboros/Network/NodeToNode/Version.hs @@ -10,13 +10,18 @@ module Ouroboros.Network.NodeToNode.Version , ConnectionMode (..) , nodeToNodeVersionCodec , nodeToNodeCodecCBORTerm + -- * Feature predicates + , isPerasEnabled ) where +import Data.Set (Set) +import Data.Set qualified as Set import Data.Text (Text) import Data.Text qualified as T import Codec.CBOR.Term qualified as CBOR +import Cardano.Base.FeatureFlags import Control.DeepSeq import GHC.Generics import NoThunks.Class (NoThunks) @@ -71,6 +76,10 @@ data NodeToNodeVersion = -- ^ Plomin HF, mandatory on mainnet as of 2025.01.29 | NodeToNodeV_15 -- ^ SRV support + | NodeToNodeV_16 + -- ^ Experimental. + -- + -- Adds Peras mini-protocols (if 'PerasFlag' is set). deriving (Eq, Ord, Enum, Bounded, Show, Generic, NFData, NoThunks) nodeToNodeVersionCodec :: CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion @@ -78,9 +87,11 @@ nodeToNodeVersionCodec = CodecCBORTerm { encodeTerm, decodeTerm } where encodeTerm NodeToNodeV_14 = CBOR.TInt 14 encodeTerm NodeToNodeV_15 = CBOR.TInt 15 + encodeTerm NodeToNodeV_16 = CBOR.TInt 16 decodeTerm (CBOR.TInt 14) = Right NodeToNodeV_14 decodeTerm (CBOR.TInt 15) = Right NodeToNodeV_15 + decodeTerm (CBOR.TInt 16) = Right NodeToNodeV_16 decodeTerm (CBOR.TInt n) = Left ( T.pack "decode NodeToNodeVersion: unknown tag: " <> T.pack (show n) , Just n @@ -148,6 +159,7 @@ nodeToNodeCodecCBORTerm = \case NodeToNodeV_14 -> codec NodeToNodeV_15 -> codec + NodeToNodeV_16 -> codec where codec = CodecCBORTerm { encodeTerm = encodeTerm, decodeTerm = decodeTerm } @@ -190,3 +202,8 @@ nodeToNodeCodecCBORTerm = data ConnectionMode = UnidirectionalMode | DuplexMode + +isPerasEnabled :: Set CardanoFeatureFlag -> NodeToNodeVersion -> Bool +isPerasEnabled featureFlags v = + Set.member PerasFlag featureFlags + && v >= NodeToNodeV_16 diff --git a/ouroboros-network-protocols/cddl/specs/handshake-node-to-node-v14.cddl b/ouroboros-network-protocols/cddl/specs/handshake-node-to-node-v14.cddl index f03cd7dd89b..fa4246c41cc 100644 --- a/ouroboros-network-protocols/cddl/specs/handshake-node-to-node-v14.cddl +++ b/ouroboros-network-protocols/cddl/specs/handshake-node-to-node-v14.cddl @@ -15,7 +15,7 @@ msgQueryReply = [3, versionTable] ; The codec only accepts definite-length maps. versionTable = { * versionNumber_v14 => v14.nodeToNodeVersionData } -versionNumber_v14 = 14 / 15 +versionNumber_v14 = 14 / 15 / 16 ; All version numbers versionNumbers = versionNumber_v14 diff --git a/ouroboros-network-protocols/changelog.d/20250929_093859_alexander.esgen_peras_diffusion.md b/ouroboros-network-protocols/changelog.d/20250929_093859_alexander.esgen_peras_diffusion.md new file mode 100644 index 00000000000..0ffdd23e3e9 --- /dev/null +++ b/ouroboros-network-protocols/changelog.d/20250929_093859_alexander.esgen_peras_diffusion.md @@ -0,0 +1,3 @@ +### Non-Breaking + +- Added ObjectDiffusion mini-protocol for Ouroboros Peras. diff --git a/ouroboros-network-protocols/ouroboros-network-protocols.cabal b/ouroboros-network-protocols/ouroboros-network-protocols.cabal index a3e81f74ba9..78a1f161dea 100644 --- a/ouroboros-network-protocols/ouroboros-network-protocols.cabal +++ b/ouroboros-network-protocols/ouroboros-network-protocols.cabal @@ -62,6 +62,10 @@ library Ouroboros.Network.Protocol.LocalTxSubmission.Codec Ouroboros.Network.Protocol.LocalTxSubmission.Server Ouroboros.Network.Protocol.LocalTxSubmission.Type + Ouroboros.Network.Protocol.ObjectDiffusion.Codec + Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + Ouroboros.Network.Protocol.ObjectDiffusion.Outbound + Ouroboros.Network.Protocol.ObjectDiffusion.Type Ouroboros.Network.Protocol.PeerSharing.Client Ouroboros.Network.Protocol.PeerSharing.Codec Ouroboros.Network.Protocol.PeerSharing.Server @@ -161,6 +165,7 @@ library testlib Ouroboros.Network.Protocol.LocalTxSubmission.Direct Ouroboros.Network.Protocol.LocalTxSubmission.Examples Ouroboros.Network.Protocol.LocalTxSubmission.Test + Ouroboros.Network.Protocol.ObjectDiffusion.Test Ouroboros.Network.Protocol.PeerSharing.Codec.CDDL Ouroboros.Network.Protocol.PeerSharing.Direct Ouroboros.Network.Protocol.PeerSharing.Examples diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Codec.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Codec.hs new file mode 100644 index 00000000000..cd9ffc36b82 --- /dev/null +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Codec.hs @@ -0,0 +1,287 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeFamilies #-} + +module Ouroboros.Network.Protocol.ObjectDiffusion.Codec + ( codecObjectDiffusion + , codecObjectDiffusionId + , byteLimitsObjectDiffusion + , timeLimitsObjectDiffusion + ) where + +import Codec.CBOR.Decoding qualified as CBOR +import Codec.CBOR.Encoding qualified as CBOR +import Codec.CBOR.Read qualified as CBOR +import Control.Monad.Class.MonadST +import Control.Monad.Class.MonadTime.SI +import Data.ByteString.Lazy (ByteString) +import Data.Kind (Type) +import Data.List.NonEmpty qualified as NonEmpty +import Network.TypedProtocol.Codec.CBOR +import Ouroboros.Network.Protocol.Limits +import Ouroboros.Network.Protocol.ObjectDiffusion.Type +import Text.Printf + +-- | Byte Limits. +byteLimitsObjectDiffusion + :: forall bytes objectId object. + (bytes -> Word) + -> ProtocolSizeLimits (ObjectDiffusion objectId object) bytes +byteLimitsObjectDiffusion = ProtocolSizeLimits stateToLimit + where + stateToLimit + :: forall (st :: ObjectDiffusion objectId object). + (ActiveState st) + => StateToken st + -> Word + stateToLimit SingInit = smallByteLimit + stateToLimit (SingObjectIds SingBlocking) = largeByteLimit + stateToLimit (SingObjectIds SingNonBlocking) = largeByteLimit + stateToLimit SingObjects = largeByteLimit + stateToLimit SingIdle = smallByteLimit + stateToLimit a@SingDone = notActiveState a + +-- | 'ObjectDiffusion' time limits. +-- +-- +---------------------------------+---------------+ +-- | 'ObjectDiffusion' state | timeout (s) | +-- +=================================+===============+ +-- | `StInit` | `waitForever` | +-- +---------------------------------+---------------+ +-- | `StIdle` | `waitForever` | +-- +---------------------------------+---------------+ +-- | @'StObjectIds' 'StBlocking'@ | `waitForever` | +-- +---------------------------------+---------------+ +-- | @'StObjectIds' 'StNonBlocking'@ | `shortWait` | +-- +---------------------------------+---------------+ +-- | `StObjects` | `shortWait` | +-- +---------------------------------+---------------+ +timeLimitsObjectDiffusion + :: forall (objectId :: Type) (object :: Type). + ProtocolTimeLimits (ObjectDiffusion objectId object) +timeLimitsObjectDiffusion = ProtocolTimeLimits stateToLimit + where + stateToLimit + :: forall (st :: ObjectDiffusion objectId object). + (ActiveState st) + => StateToken st + -> Maybe DiffTime + stateToLimit SingInit = waitForever + stateToLimit (SingObjectIds SingBlocking) = waitForever + stateToLimit (SingObjectIds SingNonBlocking) = shortWait + stateToLimit SingObjects = shortWait + stateToLimit SingIdle = waitForever + stateToLimit a@SingDone = notActiveState a + +codecObjectDiffusion + :: forall (objectId :: Type) (object :: Type) m. + (MonadST m) + => (objectId -> CBOR.Encoding) -- ^ encode 'objectId' + -> (forall s. CBOR.Decoder s objectId) -- ^ decode 'objectId' + -> (object -> CBOR.Encoding) -- ^ encode object + -> (forall s. CBOR.Decoder s object) -- ^ decode object + -> Codec (ObjectDiffusion objectId object) CBOR.DeserialiseFailure m ByteString +codecObjectDiffusion encodeObjectId decodeObjectId encodeObject decodeObject = + mkCodecCborLazyBS + (encodeObjectDiffusion encodeObjectId encodeObject) + decode + where + decode + :: forall (st :: ObjectDiffusion objectId object). + (ActiveState st) + => StateToken st + -> forall s. CBOR.Decoder s (SomeMessage st) + decode stok = do + len <- CBOR.decodeListLen + key <- CBOR.decodeWord + decodeObjectDiffusion decodeObjectId decodeObject stok len key + +encodeObjectDiffusion + :: forall (objectId :: Type) (object :: Type) + (st :: ObjectDiffusion objectId object) + (st' :: ObjectDiffusion objectId object). + (objectId -> CBOR.Encoding) -- ^ encode 'objectId' + -> (object -> CBOR.Encoding) -- ^ encode 'object' + -> Message (ObjectDiffusion objectId object) st st' + -> CBOR.Encoding +encodeObjectDiffusion encodeObjectId encodeObject = encode + where + encode + :: forall st0 st1. + Message (ObjectDiffusion objectId object) st0 st1 + -> CBOR.Encoding + encode MsgInit = + CBOR.encodeListLen 1 + <> CBOR.encodeWord 0 + encode (MsgRequestObjectIds blocking (NumObjectIdsAck ackNo) (NumObjectIdsReq reqNo)) = + CBOR.encodeListLen 4 + <> CBOR.encodeWord 1 + <> CBOR.encodeBool + ( case blocking of + SingBlocking -> True + SingNonBlocking -> False + ) + <> CBOR.encodeWord16 ackNo + <> CBOR.encodeWord16 reqNo + encode (MsgReplyObjectIds objIds) = + CBOR.encodeListLen 2 + <> CBOR.encodeWord 2 + <> CBOR.encodeListLenIndef + <> foldr (\objId r -> encodeObjectId objId <> r) CBOR.encodeBreak objIds' + where + objIds' :: [objectId] + objIds' = case objIds of + BlockingReply xs -> NonEmpty.toList xs + NonBlockingReply xs -> xs + encode (MsgRequestObjects objIds) = + CBOR.encodeListLen 2 + <> CBOR.encodeWord 3 + <> CBOR.encodeListLenIndef + <> foldr (\objId r -> encodeObjectId objId <> r) CBOR.encodeBreak objIds + encode (MsgReplyObjects objects) = + CBOR.encodeListLen 2 + <> CBOR.encodeWord 4 + <> CBOR.encodeListLenIndef + <> foldr (\objId r -> encodeObject objId <> r) CBOR.encodeBreak objects + encode MsgDone = + CBOR.encodeListLen 1 + <> CBOR.encodeWord 5 + +decodeObjectDiffusion + :: forall (objectId :: Type) (object :: Type) + (st :: ObjectDiffusion objectId object) s. + (ActiveState st) + => (forall s'. CBOR.Decoder s' objectId) -- ^ decode 'objectId' + -> (forall s'. CBOR.Decoder s' object) -- ^ decode object + -> StateToken st + -> Int + -> Word + -> CBOR.Decoder s (SomeMessage st) +decodeObjectDiffusion decodeObjectId decodeObject = decode + where + decode + :: forall (st' :: ObjectDiffusion objectId object). + (ActiveState st') + => StateToken st' + -> Int + -> Word + -> CBOR.Decoder s (SomeMessage st') + decode stok len key = do + case (stok, len, key) of + (SingInit, 1, 0) -> + return $ SomeMessage MsgInit + (SingIdle, 4, 1) -> do + blocking <- CBOR.decodeBool + ackNo <- NumObjectIdsAck <$> CBOR.decodeWord16 + reqNo <- NumObjectIdsReq <$> CBOR.decodeWord16 + return $! if blocking + then SomeMessage $ MsgRequestObjectIds SingBlocking ackNo reqNo + else SomeMessage $ MsgRequestObjectIds SingNonBlocking ackNo reqNo + (SingObjectIds b, 2, 2) -> do + CBOR.decodeListLenIndef + objIds <- CBOR.decodeSequenceLenIndef + (flip (:)) + [] + reverse + decodeObjectId + case (b, objIds) of + (SingBlocking, t : ts) -> + return + $ SomeMessage + $ MsgReplyObjectIds (BlockingReply (t NonEmpty.:| ts)) + (SingNonBlocking, ts) -> + return + $ SomeMessage + $ MsgReplyObjectIds (NonBlockingReply ts) + (SingBlocking, []) -> + fail "codecObjectDiffusion: MsgReplyObjectIds: empty list not permitted" + (SingIdle, 2, 3) -> do + CBOR.decodeListLenIndef + objIds <- CBOR.decodeSequenceLenIndef + (flip (:)) + [] + reverse + decodeObjectId + return $ SomeMessage $ MsgRequestObjects objIds + (SingObjects, 2, 4) -> do + CBOR.decodeListLenIndef + objIds <- CBOR.decodeSequenceLenIndef + (flip (:)) + [] + reverse + decodeObject + return $ SomeMessage $ MsgReplyObjects objIds + (SingIdle, 1, 5) -> + return $ SomeMessage MsgDone + (SingDone, _, _) -> notActiveState stok + -- failures per protocol state + (SingInit, _, _) -> + fail $ printf "codecObjectDiffusion (%s) unexpected key (%d, %d)" (show stok) key len + (SingObjectIds SingBlocking, _, _) -> + fail $ printf "codecObjectDiffusion (%s) unexpected key (%d, %d)" (show stok) key len + (SingObjectIds SingNonBlocking, _, _) -> + fail $ printf "codecObjectDiffusion (%s) unexpected key (%d, %d)" (show stok) key len + (SingObjects, _, _) -> + fail $ printf "codecObjectDiffusion (%s) unexpected key (%d, %d)" (show stok) key len + (SingIdle, _, _) -> + fail $ printf "codecObjectDiffusion (%s) unexpected key (%d, %d)" (show stok) key len + +codecObjectDiffusionId + :: forall objectId object m. + (Monad m) + => Codec + (ObjectDiffusion objectId object) + CodecFailure + m + (AnyMessage (ObjectDiffusion objectId object)) +codecObjectDiffusionId = Codec {encode, decode} + where + encode + :: forall st st'. + (ActiveState st + , StateTokenI st + ) + => Message (ObjectDiffusion objectId object) st st' + -> AnyMessage (ObjectDiffusion objectId object) + encode = AnyMessage + + decode + :: forall (st :: ObjectDiffusion objectId object). + (ActiveState st) + => StateToken st + -> m (DecodeStep + (AnyMessage (ObjectDiffusion objectId object)) + CodecFailure + m + (SomeMessage st) + ) + decode stok = return $ DecodePartial $ \bytes -> + return $ case (stok, bytes) of + (SingInit, Just (AnyMessage msg@MsgInit)) -> + DecodeDone (SomeMessage msg) Nothing + (SingIdle, Just (AnyMessage msg@(MsgRequestObjectIds SingBlocking _ _))) -> + DecodeDone (SomeMessage msg) Nothing + (SingIdle, Just (AnyMessage msg@(MsgRequestObjectIds SingNonBlocking _ _))) -> + DecodeDone (SomeMessage msg) Nothing + (SingIdle, Just (AnyMessage msg@(MsgRequestObjects {}))) -> + DecodeDone (SomeMessage msg) Nothing + (SingObjects, Just (AnyMessage msg@(MsgReplyObjects {}))) -> + DecodeDone (SomeMessage msg) Nothing + (SingObjectIds b, Just (AnyMessage msg)) -> case (b, msg) of + (SingBlocking, MsgReplyObjectIds (BlockingReply {})) -> + DecodeDone (SomeMessage msg) Nothing + (SingNonBlocking, MsgReplyObjectIds (NonBlockingReply {})) -> + DecodeDone (SomeMessage msg) Nothing + (_, _) -> + DecodeFail $ CodecFailure "codecObjectDiffusionId: no matching message" + (SingIdle, Just (AnyMessage msg@MsgDone)) -> + DecodeDone (SomeMessage msg) Nothing + (SingDone, _) -> + notActiveState stok + (_, _) -> + DecodeFail $ CodecFailure "codecObjectDiffusionId: no matching message" diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Inbound.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Inbound.hs new file mode 100644 index 00000000000..26e3200a762 --- /dev/null +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Inbound.hs @@ -0,0 +1,116 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | A view of the object diffusion protocol from the point of view of +-- the inbound/client peer. +-- +-- This provides a view that uses less complex types and should be easier to +-- use than the underlying typed protocol itself. +-- +-- For execution, a conversion into the typed protocol is provided. +module Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + ( -- * Protocol type for the inbound + ObjectDiffusionInboundPipelined (..) + , InboundStIdle (..) + , Collect (..) + -- * Execution as a typed protocol + , objectDiffusionInboundPeerPipelined + ) where + +import Data.List.NonEmpty (NonEmpty) +import Network.TypedProtocol.Core +import Network.TypedProtocol.Peer (Peer, PeerPipelined (..)) +import Network.TypedProtocol.Peer.Client +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + +data ObjectDiffusionInboundPipelined objectId object m a where + ObjectDiffusionInboundPipelined + :: InboundStIdle Z objectId object m a + -> ObjectDiffusionInboundPipelined objectId object m a + +-- | This is the type of the pipelined results, collected by 'CollectPipelined'. +-- This protocol can pipeline requests for object ids and objects, +-- so we use a sum of either for collecting the responses. +data Collect objectId object + = -- | The result of 'SendMsgRequestObjectIdsPipelined'. It also carries + -- the number of objectIds originally requested. + CollectObjectIds NumObjectIdsReq [objectId] + | -- | The result of 'SendMsgRequestObjectsPipelined'. The actual reply only + -- contains the objects sent, but this pairs them up with the + -- objects requested. This is because the peer can determine that + -- some objects are no longer needed. + CollectObjects [objectId] [object] + +data InboundStIdle (n :: N) objectId object m a where + SendMsgRequestObjectIdsBlocking + :: NumObjectIdsAck -- ^ number of objectIds to acknowledge + -> NumObjectIdsReq -- ^ number of objectIds to request + -> (NonEmpty objectId -> InboundStIdle Z objectId object m a) + -> InboundStIdle Z objectId object m a + SendMsgRequestObjectIdsPipelined + :: NumObjectIdsAck + -> NumObjectIdsReq + -> InboundStIdle (S n) objectId object m a + -> InboundStIdle n objectId object m a + SendMsgRequestObjectsPipelined + :: [objectId] + -> InboundStIdle (S n) objectId object m a + -> InboundStIdle n objectId object m a + CollectPipelined + :: Maybe (InboundStIdle (S n) objectId object m a) + -> (Collect objectId object -> InboundStIdle n objectId object m a) + -> InboundStIdle (S n) objectId object m a + SendMsgDone + :: a + -> InboundStIdle Z objectId object m a + WithEffect :: m (InboundStIdle n objectId object m a) + -> InboundStIdle n objectId object m a + +inboundRun + :: forall (n :: N) objectId object m a. + (Functor m) + => InboundStIdle n objectId object m a + -> Peer (ObjectDiffusion objectId object) AsClient (Pipelined n (Collect objectId object)) StIdle m a + +inboundRun (SendMsgRequestObjectIdsBlocking ackNo reqNo k) = + Yield (MsgRequestObjectIds SingBlocking ackNo reqNo) + $ Await + $ \case + MsgReplyObjectIds (BlockingReply objectIds) -> + inboundRun (k objectIds) +inboundRun (SendMsgRequestObjectIdsPipelined ackNo reqNo k) = + YieldPipelined + (MsgRequestObjectIds SingNonBlocking ackNo reqNo) + (ReceiverAwait + $ \(MsgReplyObjectIds (NonBlockingReply objectIds)) -> + ReceiverDone (CollectObjectIds reqNo objectIds) + ) + (inboundRun k) +inboundRun (SendMsgRequestObjectsPipelined objectIds k) = + YieldPipelined + (MsgRequestObjects objectIds) + (ReceiverAwait + $ \(MsgReplyObjects objects) -> + ReceiverDone (CollectObjects objectIds objects) + ) + (inboundRun k) +inboundRun (CollectPipelined none collect) = + Collect + (inboundRun <$> none) + (inboundRun . collect) +inboundRun (SendMsgDone done) = + Yield MsgDone $ Done done +inboundRun (WithEffect mNext) = + Effect $ inboundRun <$> mNext + +-- | Transform a 'ObjectDiffusionInboundPipelined' into a 'PeerPipelined'. +objectDiffusionInboundPeerPipelined + :: forall objectId object m a. + (Functor m) + => ObjectDiffusionInboundPipelined objectId object m a + -> PeerPipelined (ObjectDiffusion objectId object) AsClient StInit m a +objectDiffusionInboundPeerPipelined (ObjectDiffusionInboundPipelined inboundSt) = + PeerPipelined $ Yield MsgInit $ inboundRun inboundSt diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Outbound.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Outbound.hs new file mode 100644 index 00000000000..8ec0a92a02c --- /dev/null +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Outbound.hs @@ -0,0 +1,111 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | A view of the object diffusion protocol from the point of view of +-- the outbound/server peer. +-- +-- This provides a view that uses less complex types and should be easier to +-- use than the underlying typed protocol itself. +-- +-- For execution, 'objectDiffusionOutboundPeer' is provided for conversion +-- into the typed protocol. +module Ouroboros.Network.Protocol.ObjectDiffusion.Outbound + ( -- * Protocol type for the outbound + ObjectDiffusionOutbound (..) + , OutboundStIdle (..) + , OutboundStObjectIds (..) + , OutboundStObjects (..) + , SingBlockingStyle (..) + , BlockingReplyList (..) + -- * Execution as a typed protocol + , objectDiffusionOutboundPeer + ) where + +import Network.TypedProtocol.Core +import Network.TypedProtocol.Peer (Peer) +import Network.TypedProtocol.Peer.Server +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + +-- | The outbound side of the object diffusion protocol. +-- +-- The peer in the outbound/server role submits objects to the peer in the +-- inbound/client role. +newtype ObjectDiffusionOutbound objectId object m a = ObjectDiffusionOutbound { + runObjectDiffusionOutbound :: m (OutboundStIdle objectId object m a) + } + +-- | In the 'StIdle' protocol state, the outbound does not have agency. Instead +-- it is waiting for: +-- +-- * a request for object ids (blocking or non-blocking) +-- * a request for a given list of objects +-- * a termination message +-- +-- It must be prepared to handle any of these. +data OutboundStIdle objectId object m a = OutboundStIdle { + recvMsgRequestObjectIds :: forall blocking. + SingBlockingStyle blocking + -> NumObjectIdsAck + -> NumObjectIdsReq + -> m (OutboundStObjectIds blocking objectId object m a), + recvMsgRequestObjects :: [objectId] + -> m (OutboundStObjects objectId object m a), + recvMsgDone :: m a + } + +data OutboundStObjectIds blocking objectId object m a where + SendMsgReplyObjectIds + :: BlockingReplyList blocking objectId + -> OutboundStIdle objectId object m a + -> OutboundStObjectIds blocking objectId object m a + +data OutboundStObjects objectId object m a where + SendMsgReplyObjects + :: [object] + -> OutboundStIdle objectId object m a + -> OutboundStObjects objectId object m a + +outboundRun + :: forall objectId object m a. + (Monad m) + => OutboundStIdle objectId object m a + -> Peer (ObjectDiffusion objectId object) AsServer NonPipelined StIdle m a +outboundRun OutboundStIdle {recvMsgRequestObjectIds, recvMsgRequestObjects, recvMsgDone} = + Await $ \case + MsgRequestObjectIds blocking ackNo reqNo -> Effect $ do + reply <- recvMsgRequestObjectIds blocking ackNo reqNo + case reply of + SendMsgReplyObjectIds objectIds k -> + -- TODO: investigate why GHC cannot infer `SingI`; it used to in + -- `coot/typed-protocols-rewrite` branch + return $ case blocking of + SingBlocking -> + Yield + (MsgReplyObjectIds objectIds) + (outboundRun k) + SingNonBlocking -> + Yield + (MsgReplyObjectIds objectIds) + (outboundRun k) + MsgRequestObjects objectIds -> Effect $ do + SendMsgReplyObjects objects k <- recvMsgRequestObjects objectIds + return $ + Yield + (MsgReplyObjects objects) + (outboundRun k) + MsgDone -> Effect $ Done <$> recvMsgDone + +-- | A non-pipelined 'Peer' representing the 'ObjectDiffusionOutbound'. +objectDiffusionOutboundPeer + :: forall objectId object m a. + (Monad m) + => ObjectDiffusionOutbound objectId object m a + -> Peer (ObjectDiffusion objectId object) AsServer NonPipelined StInit m a +objectDiffusionOutboundPeer (ObjectDiffusionOutbound outboundSt) = + Await + (\MsgInit -> Effect (outboundRun <$> outboundSt)) diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Type.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Type.hs new file mode 100644 index 00000000000..df6eb6d3ae1 --- /dev/null +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Type.hs @@ -0,0 +1,298 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveFoldable #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE StandaloneKindSignatures #-} +{-# LANGUAGE TypeFamilies #-} + +-- | The type of the object diffusion protocol. +-- +-- This is used to diffuse generic objects between nodes. +module Ouroboros.Network.Protocol.ObjectDiffusion.Type + ( ObjectDiffusion (..) + , Message (..) + , SingObjectDiffusion (..) + , BlockingReplyList (..) + , NumObjectIdsAck (..) + , NumObjectIdsReq (..) + , NumObjectsReq (..) + , NumObjectsOutstanding (..) + -- re-exports + , SingBlockingStyle (..) + , SizeInBytes (..) + , StBlockingStyle (..) + ) where + +import Control.DeepSeq (NFData (..)) +import Data.Kind (Type) +import Data.List.NonEmpty (NonEmpty) +import Data.Monoid (Sum (..)) +import Data.Singletons +import Data.Word (Word16) +import GHC.Generics (Generic) +import Network.TypedProtocol.Core +import NoThunks.Class (NoThunks (..)) +import Ouroboros.Network.Protocol.TxSubmission2.Type (SingBlockingStyle (..), + StBlockingStyle (..)) +import Ouroboros.Network.SizeInBytes (SizeInBytes (..)) +import Ouroboros.Network.Util.ShowProxy (ShowProxy (..)) +import Quiet (Quiet (..)) + +-- | The kind of the object diffusion protocol, and the types of the states in +-- the protocol state machine. +-- +-- We describe this protocol using indiscriminately the labels \"inbound\"/\"client\" +-- for the peer that is receiving objects, and \"outbound\"/\"server\" for the one +-- sending them. +type ObjectDiffusion :: Type -> Type -> Type +data ObjectDiffusion objectId object where + -- | Initial protocol message. + StInit :: ObjectDiffusion objectId object + -- | The inbound node has agency; it can either terminate, ask for object + -- identifiers or ask for objects. + -- + -- There is no timeout in this state. + StIdle :: ObjectDiffusion objectId object + -- | The outbound node has agency; it must reply with a list of object + -- identifiers that it wishes to submit. + -- + -- There are two sub-states for this, for blocking and non-blocking cases. + StObjectIds :: StBlockingStyle -> ObjectDiffusion objectId object + -- | The outbound node has agency; it must reply with the list of + -- objects. + StObjects :: ObjectDiffusion objectId object + -- | Nobody has agency; termination state. + StDone :: ObjectDiffusion objectId object + +instance ( ShowProxy objectId + , ShowProxy object + ) + => ShowProxy (ObjectDiffusion objectId object) where + showProxy _ = + concat + [ "ObjectDiffusion ", + showProxy (Proxy :: Proxy objectId), + " ", + showProxy (Proxy :: Proxy object) + ] + +instance ShowProxy (StIdle :: ObjectDiffusion objectId object) where + showProxy _ = "StIdle" + +type SingObjectDiffusion + :: ObjectDiffusion objectId object + -> Type +data SingObjectDiffusion k where + SingInit :: SingObjectDiffusion StInit + SingIdle :: SingObjectDiffusion StIdle + SingObjectIds :: SingBlockingStyle stBlocking + -> SingObjectDiffusion (StObjectIds stBlocking) + SingObjects :: SingObjectDiffusion StObjects + SingDone :: SingObjectDiffusion StDone + +deriving instance Show (SingObjectDiffusion k) + +instance StateTokenI StInit where stateToken = SingInit + +instance StateTokenI StIdle where stateToken = SingIdle + +instance (SingI stBlocking) => StateTokenI (StObjectIds stBlocking) where + stateToken = SingObjectIds sing + +instance StateTokenI StObjects where stateToken = SingObjects + +instance StateTokenI StDone where stateToken = SingDone + +newtype NumObjectIdsAck = NumObjectIdsAck {getNumObjectIdsAck :: Word16} + deriving (Eq, Ord, NFData, Generic) + deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks) + deriving (Semigroup) via (Sum Word16) + deriving (Monoid) via (Sum Word16) + deriving (Show) via (Quiet NumObjectIdsAck) + +newtype NumObjectIdsReq = NumObjectIdsReq {getNumObjectIdsReq :: Word16} + deriving (Eq, Ord, NFData, Generic) + deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks) + deriving (Semigroup) via (Sum Word16) + deriving (Monoid) via (Sum Word16) + deriving (Show) via (Quiet NumObjectIdsReq) + +newtype NumObjectsReq = NumObjectsReq {getNumObjectsReq :: Word16} + deriving (Eq, Ord, NFData, Generic) + deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks) + deriving (Semigroup) via (Sum Word16) + deriving (Monoid) via (Sum Word16) + deriving (Show) via (Quiet NumObjectsReq) + +newtype NumObjectsOutstanding = NumObjectsOutstanding {getNumObjectsOutstanding :: Word16} + deriving (Eq, Ord, NFData, Generic) + deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks) + deriving (Semigroup) via (Sum Word16) + deriving (Monoid) via (Sum Word16) + deriving (Show) via (Quiet NumObjectsOutstanding) + +-- | There are some constraints of the protocol that are not captured in the +-- types of the messages, but are documented with the messages. Violation +-- of these constraints is also a protocol error. The constraints are intended +-- to ensure that implementations are able to work in bounded space. +instance Protocol (ObjectDiffusion objectId object) where + -- | The messages in the object diffusion protocol. + -- + -- In this protocol the consumer (inbound side, client role) always + -- initiates and the producer (outbound side, server role) replies. + -- This makes it a pull based protocol where the receiver manages the + -- control flow. + -- + -- The protocol involves asking for object identifiers, and then + -- asking for objects corresponding to the identifiers of interest. + -- + -- There are two ways to ask for object identifiers, blocking and + -- non-blocking. They otherwise have the same semantics. + -- + -- The protocol maintains a notional FIFO of "outstanding" object + -- identifiers that have been provided but not yet acknowledged. Only + -- objects that are outstanding can be requested: they can be + -- requested in any order, but at most once. Object identifiers are + -- acknowledged in the same FIFO order they were provided in. The + -- acknowledgement is included in the same messages used to ask for more + -- object identifiers. + data Message (ObjectDiffusion objectId object) from to where + -- | Initial message. The payload is currently unused; the planned use case + -- is to indicate that the inbound side is only interested to receive messages + -- newer than a given indicator. + MsgInit + :: Message (ObjectDiffusion objectId object) StInit StIdle + -- | Request a list of object identifiers from the server, and confirm a + -- number of outstanding object identifiers. + -- + -- With 'TokBlocking' this is a blocking operation: the response will always + -- have at least one object identifier, and it does not expect a prompt + -- response: there is no timeout. This covers the case when there is nothing + -- else to do but wait. + -- + -- With 'TokNonBlocking' this is a non-blocking operation: the response may + -- be an empty list and this does expect a prompt response. This covers high + -- throughput use cases where we wish to pipeline, by interleaving requests + -- for additional object identifiers with requests for objects, which + -- requires these requests not block. + -- + -- The request gives the maximum number of object identifiers that can be + -- accepted in the response. This must be greater than zero in the + -- 'TokBlocking' case. In the 'TokNonBlocking' case either the numbers + -- acknowledged or the number requested __MUST__ be non-zero. In either + -- case, the number requested __MUST__ not put the total outstanding over + -- the fixed protocol limit. + -- + -- The request also gives the number of outstanding object identifiers that + -- can now be acknowledged. The actual objects to acknowledge are known to + -- the server based on the FIFO order in which they were provided. + -- + -- There is no choice about when to use the blocking case versus the + -- non-blocking case, it depends on whether there are any remaining + -- unacknowledged objects (after taking into account the ones acknowledged + -- in this message): + -- + -- * The blocking case __MUST__ be used when there are zero remaining + -- unacknowledged objects. + -- + -- * The non-blocking case __MUST__ be used when there are non-zero + -- remaining unacknowledged objects. + MsgRequestObjectIds + :: forall (blocking :: StBlockingStyle) objectId object. + SingBlockingStyle blocking + -> NumObjectIdsAck -- ^ Acknowledge this number of outstanding objects + -> NumObjectIdsReq -- ^ Request up to this number of object ids + -> Message (ObjectDiffusion objectId object) StIdle (StObjectIds blocking) + -- | Reply with a list of object identifiers for available objects, along + -- with the size of each object. + -- + -- The list must not be longer than the maximum number requested. + -- + -- In the 'StObjectIds' 'Blocking' state the list must be non-empty while in + -- the 'StObjectIds' 'NonBlocking' state the list may be empty. + -- + -- These objects are added to the notional FIFO of outstanding object + -- identifiers for the protocol. + -- + -- The order in which these object identifiers are returned must be the + -- order in which they are submitted to the mempool, to preserve dependent + -- objects. + MsgReplyObjectIds + :: BlockingReplyList blocking objectId + -> Message (ObjectDiffusion objectId object) (StObjectIds blocking) StIdle + -- | Request one or more objects corresponding to the given object + -- identifiers. + -- + -- While it is the responsibility of the server to keep within + -- pipelining in-flight limits, the client must also cooperate by keeping + -- the total requested across all in-flight requests within the limits. + -- + -- It is an error to ask for object identifiers that were not + -- previously announced (via 'MsgReplyObjectIds'). + -- + -- It is an error to ask for object identifiers that are not + -- outstanding or that were already asked for. + MsgRequestObjects + :: [objectId] + -> Message (ObjectDiffusion objectId object) StIdle StObjects + -- | Reply with the requested objects, or implicitly discard. + -- + -- Objects can become invalid between the time the object + -- identifier was sent and the object being requested. Invalid + -- (including committed) objects do not need to be sent. + -- + -- Any object identifiers requested but not provided in this reply + -- should be considered as if this peer had never announced them. (Note + -- that this is no guarantee that the object is invalid, it may still + -- be valid and available from another peer). + MsgReplyObjects + :: [object] + -> Message (ObjectDiffusion objectId object) StObjects StIdle + -- | Termination message, initiated by the client side when idle. + MsgDone + :: Message (ObjectDiffusion objectId object) StIdle StDone + + type StateAgency StInit = ClientAgency + type StateAgency StIdle = ClientAgency + type StateAgency (StObjectIds b) = ServerAgency + type StateAgency StObjects = ServerAgency + type StateAgency StDone = NobodyAgency + + type StateToken = SingObjectDiffusion + +instance ( NFData objectId + , NFData object + ) + => NFData (Message (ObjectDiffusion objectId object) from to) where + rnf MsgInit = () + rnf (MsgRequestObjectIds tkbs w1 w2) = rnf tkbs `seq` rnf w1 `seq` rnf w2 + rnf (MsgReplyObjectIds brl) = rnf brl + rnf (MsgRequestObjects objIds) = rnf objIds + rnf (MsgReplyObjects objects) = rnf objects + rnf MsgDone = () + +-- | We have requests for lists of things. In the blocking case the +-- corresponding reply must be non-empty, whereas in the non-blocking case +-- and empty reply is fine. +data BlockingReplyList (blocking :: StBlockingStyle) a where + BlockingReply :: NonEmpty a -> BlockingReplyList StBlocking a + NonBlockingReply :: [a] -> BlockingReplyList StNonBlocking a +deriving instance (Eq a) => Eq (BlockingReplyList blocking a) +deriving instance (Show a) => Show (BlockingReplyList blocking a) +deriving instance Foldable (BlockingReplyList blocking) + +instance (NFData a) => NFData (BlockingReplyList blocking a) where + rnf (BlockingReply as) = rnf as + rnf (NonBlockingReply as) = rnf as + +deriving instance (Eq objectId, Eq object) + => Eq (Message (ObjectDiffusion objectId object) from to) + +deriving instance (Show objectId, Show object) + => Show (Message (ObjectDiffusion objectId object) from to) diff --git a/ouroboros-network-protocols/test/Main.hs b/ouroboros-network-protocols/test/Main.hs index 78f8f760245..3d32e0ba433 100644 --- a/ouroboros-network-protocols/test/Main.hs +++ b/ouroboros-network-protocols/test/Main.hs @@ -9,6 +9,7 @@ import Ouroboros.Network.Protocol.KeepAlive.Test qualified (tests) import Ouroboros.Network.Protocol.LocalStateQuery.Test qualified (tests) import Ouroboros.Network.Protocol.LocalTxMonitor.Test qualified (tests) import Ouroboros.Network.Protocol.LocalTxSubmission.Test qualified (tests) +import Ouroboros.Network.Protocol.ObjectDiffusion.Test qualified (tests) import Ouroboros.Network.Protocol.PeerSharing.Test qualified (tests) import Ouroboros.Network.Protocol.TxSubmission2.Test qualified (tests) import Test.AnchoredFragment qualified (tests) @@ -40,4 +41,5 @@ tests = , Ouroboros.Network.Protocol.Handshake.Test.tests , Ouroboros.Network.Protocol.KeepAlive.Test.tests , Ouroboros.Network.Protocol.PeerSharing.Test.tests + , Ouroboros.Network.Protocol.ObjectDiffusion.Test.tests ] diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/ObjectDiffusion/Test.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/ObjectDiffusion/Test.hs new file mode 100644 index 00000000000..bf4a008676a --- /dev/null +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/ObjectDiffusion/Test.hs @@ -0,0 +1,265 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE QuantifiedConstraints #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeFamilies #-} + +{-# OPTIONS_GHC -Wno-orphans #-} +{-# LANGUAGE DeriveGeneric #-} +module Ouroboros.Network.Protocol.ObjectDiffusion.Test (tests) where + +import Data.ByteString.Lazy (ByteString) +import Data.List (nub) +import Data.List.NonEmpty qualified as NonEmpty +import Data.Word (Word16) + +import Control.Monad.Class.MonadST (MonadST) +import Control.Monad.ST (runST) + +import Codec.Serialise (DeserialiseFailure, Serialise) +import Codec.Serialise qualified as Serialise (decode, encode) + +import Network.TypedProtocol.Codec +import Network.TypedProtocol.Codec.Properties (prop_codecM, prop_codec_splitsM) + +import Ouroboros.Network.Util.ShowProxy + +import Ouroboros.Network.Protocol.ObjectDiffusion.Codec +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + +import Test.Data.CDDL (Any (..)) +import Test.Ouroboros.Network.Protocol.Utils (prop_codec_cborM, + prop_codec_valid_cbor_encoding, splits2, splits3) +import Test.Ouroboros.Network.Utils (renderRanges) + +import Control.DeepSeq +import GHC.Generics +import Test.QuickCheck as QC +import Test.QuickCheck.Instances.ByteString () +import Test.Tasty (TestTree, testGroup) +import Test.Tasty.QuickCheck (testProperty) + + +-- +-- Test cases +-- + + +tests :: TestTree +tests = + testGroup "Ouroboros.Network.Protocol" + [ testGroup "ObjectDiffusion" + [ testProperty "codec" prop_codec + , testProperty "codec id" prop_codec_id + , testProperty "codec 2-splits" prop_codec_splits2 + , testProperty "codec 3-splits" $ withMaxSize 10 + prop_codec_splits3 + , testProperty "codec cbor" prop_codec_cbor + , testProperty "codec valid cbor" prop_codec_valid_cbor + ] + ] + +-- +-- Common types & clients and servers used in the tests in this module. +-- + +newtype Object = Object ObjectId + deriving (Eq, Show, Arbitrary, Serialise, Generic, NFData) + +instance ShowProxy Object where + showProxy _ = "Object" + +-- | We use any `CBOR.Term`. This allows us to use `any` in cddl specs. +-- +newtype ObjectId = ObjectId Any + deriving (Eq, Ord, Show, Arbitrary, Serialise, Generic, NFData) + +instance ShowProxy ObjectId where + showProxy _ = "ObjectId" + +deriving newtype instance Arbitrary NumObjectIdsAck +deriving newtype instance Arbitrary NumObjectIdsReq + +instance Arbitrary (AnyMessage (ObjectDiffusion ObjectId Object)) where + arbitrary = oneof + [ pure $ AnyMessage MsgInit + , AnyMessage + <$> ( MsgRequestObjectIds SingBlocking + <$> arbitrary + <*> arbitrary + ) + + , AnyMessage + <$> ( MsgRequestObjectIds SingNonBlocking + <$> arbitrary + <*> arbitrary + ) + + , AnyMessage + <$> MsgReplyObjectIds + <$> ( BlockingReply + . NonEmpty.fromList + . QC.getNonEmpty + ) + <$> arbitrary + + , AnyMessage + <$> MsgReplyObjectIds + <$> NonBlockingReply + <$> arbitrary + + , AnyMessage + <$> MsgRequestObjects + <$> arbitrary + + , AnyMessage + <$> MsgReplyObjects + <$> arbitrary + + , AnyMessage + <$> pure MsgDone + ] + +instance (Eq objectId + , Eq object + ) + => Eq (AnyMessage (ObjectDiffusion objectId object)) where + + (==) (AnyMessage MsgInit) + (AnyMessage MsgInit) = True + + (==) (AnyMessage (MsgRequestObjectIds SingBlocking ackNo reqNo)) + (AnyMessage (MsgRequestObjectIds SingBlocking ackNo' reqNo')) = + (ackNo, reqNo) == (ackNo', reqNo') + + (==) (AnyMessage (MsgRequestObjectIds SingNonBlocking ackNo reqNo)) + (AnyMessage (MsgRequestObjectIds SingNonBlocking ackNo' reqNo')) = + (ackNo, reqNo) == (ackNo', reqNo') + + (==) (AnyMessage (MsgReplyObjectIds (BlockingReply objectIds))) + (AnyMessage (MsgReplyObjectIds (BlockingReply objectIds'))) = + objectIds == objectIds' + + (==) (AnyMessage (MsgReplyObjectIds (NonBlockingReply objectIds))) + (AnyMessage (MsgReplyObjectIds (NonBlockingReply objectIds'))) = + objectIds == objectIds' + + (==) (AnyMessage (MsgRequestObjects objectIds)) + (AnyMessage (MsgRequestObjects objectIds')) = objectIds == objectIds' + + (==) (AnyMessage (MsgReplyObjects txs)) + (AnyMessage (MsgReplyObjects txs')) = txs == txs' + + (==) (AnyMessage MsgDone) + (AnyMessage MsgDone) = True + + _ == _ = False + + +codec :: MonadST m + => Codec + (ObjectDiffusion ObjectId Object) + DeserialiseFailure + m ByteString +codec = codecObjectDiffusion + Serialise.encode Serialise.decode + Serialise.encode Serialise.decode + + +-- | Check the codec round trip property. +-- +prop_codec + :: AnyMessage (ObjectDiffusion ObjectId Object) + -> Property +prop_codec msg = + runST (prop_codecM codec msg) + +-- | Check the codec round trip property for the id condec. +-- +prop_codec_id + :: AnyMessage (ObjectDiffusion ObjectId Object) + -> Property +prop_codec_id msg = + runST (prop_codecM codecObjectDiffusionId msg) + +-- | Check for data chunk boundary problems in the codec using 2 chunks. +-- +prop_codec_splits2 + :: AnyMessage (ObjectDiffusion ObjectId Object) + -> Property +prop_codec_splits2 msg = + runST (prop_codec_splitsM splits2 codec msg) + +-- | Check for data chunk boundary problems in the codec using 3 chunks. +-- +prop_codec_splits3 + :: AnyMessage (ObjectDiffusion ObjectId Object) + -> Property +prop_codec_splits3 msg = + labelMsg msg $ + runST (prop_codec_splitsM splits3 codec msg) + +prop_codec_cbor + :: AnyMessage (ObjectDiffusion ObjectId Object) + -> Property +prop_codec_cbor msg = + runST (prop_codec_cborM codec msg) + +-- | Check that the encoder produces a valid CBOR. +-- +prop_codec_valid_cbor + :: AnyMessage (ObjectDiffusion ObjectId Object) + -> Property +prop_codec_valid_cbor = prop_codec_valid_cbor_encoding codec + +-- +-- Local generators +-- + +data ObjectSubmissionTestParams = + ObjectSubmissionTestParams { + testMaxUnacked :: Positive (Small Word16), + testMaxObjectIdsToRequest :: Positive (Small Word16), + testMaxObjectToRequest :: Positive (Small Word16), + testTransactions :: DistinctList Object + } + deriving Show + +instance Arbitrary ObjectSubmissionTestParams where + arbitrary = + ObjectSubmissionTestParams <$> arbitrary + <*> arbitrary + <*> arbitrary + <*> arbitrary + + shrink (ObjectSubmissionTestParams a b c d) = + [ ObjectSubmissionTestParams a' b' c' d' + | (a', b', c', d') <- shrink (a, b, c, d) ] + + +newtype DistinctList a = DistinctList { fromDistinctList :: [a] } + deriving Show + +instance (Eq a, Arbitrary a) => Arbitrary (DistinctList a) where + arbitrary = DistinctList . nub <$> arbitrary + + shrink (DistinctList xs) = + [ DistinctList (nub xs') | xs' <- shrink xs ] + + +labelMsg :: AnyMessage (ObjectDiffusion objectId object) -> Property -> Property +labelMsg (AnyMessage msg) = + label (case msg of + MsgInit -> "MsgInit" + MsgRequestObjectIds {} -> "MsgRequestObjectIds" + MsgReplyObjectIds as -> "MsgReplyObjectIds " ++ renderRanges 3 (length as) + MsgRequestObjects as -> "MsgRequestObjects " ++ renderRanges 3 (length as) + MsgReplyObjects as -> "MsgReplyObjects " ++ renderRanges 3 (length as) + MsgDone -> "MsgDone" + ) diff --git a/ouroboros-network/cardano-diffusion/Cardano/Network/NodeToNode.hs b/ouroboros-network/cardano-diffusion/Cardano/Network/NodeToNode.hs index f8e1c702267..6a746b3bd5c 100644 --- a/ouroboros-network/cardano-diffusion/Cardano/Network/NodeToNode.hs +++ b/ouroboros-network/cardano-diffusion/Cardano/Network/NodeToNode.hs @@ -23,6 +23,8 @@ module Cardano.Network.NodeToNode , txSubmissionProtocolLimits , keepAliveProtocolLimits , peerSharingProtocolLimits + , perasCertDiffusionProtocolLimits + , perasVoteDiffusionProtocolLimits , defaultMiniProtocolParameters , NodeToNodeVersion (..) , NodeToNodeVersionData (..) @@ -71,17 +73,21 @@ module Cardano.Network.NodeToNode , txSubmissionMiniProtocolNum , keepAliveMiniProtocolNum , peerSharingMiniProtocolNum + , perasCertDiffusionMiniProtocolNum + , perasVoteDiffusionMiniProtocolNum ) where import Control.Exception (SomeException) import Data.ByteString.Lazy qualified as BL +import Data.Set (Set) import Data.Word import Network.Mux qualified as Mx import Network.Socket (Socket, StructLinger (..)) import Network.Socket qualified as Socket +import Cardano.Base.FeatureFlags (CardanoFeatureFlag (..)) import Ouroboros.Network.ConnectionManager.Types (DataFlow (..), ExceptionInHandler (..)) import Ouroboros.Network.Context @@ -97,6 +103,7 @@ import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..)) import Ouroboros.Network.Protocol.Handshake.Codec import Ouroboros.Network.Protocol.Handshake.Type import Ouroboros.Network.Protocol.Handshake.Version hiding (Accept) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type (NumObjectsOutstanding) import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..)) import Ouroboros.Network.Server.RateLimiting import Ouroboros.Network.SizeInBytes @@ -109,25 +116,32 @@ import Ouroboros.Network.TxSubmission.Inbound.V2.Policy (TxDecisionPolicy (..), data NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b = NodeToNodeProtocols { -- | chain-sync mini-protocol -- - chainSyncProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, + chainSyncProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, -- | block-fetch mini-protocol -- - blockFetchProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, + blockFetchProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, -- | tx-submission mini-protocol -- - txSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, + txSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, + + -- | Peras certificate diffusion mini-protocol + -- + perasCertDiffusionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, + + -- | Peras vote diffusion mini-protocol + -- + perasVoteDiffusionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, -- | keep-alive mini-protocol -- - keepAliveProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, + keepAliveProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b, -- | peer sharing mini-protocol -- - peerSharingProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b - - } + peerSharingProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b +} type NodeToNodeProtocolsWithExpandedCtx appType ntnAddr bytes m a b = NodeToNodeProtocols appType (ExpandedInitiatorContext ntnAddr m) (ResponderContext ntnAddr) bytes m a b @@ -136,11 +150,11 @@ type NodeToNodeProtocolsWithMinimalCtx appType ntnAddr bytes m a b = data MiniProtocolParameters = MiniProtocolParameters { - chainSyncPipeliningHighMark :: !Word16, + chainSyncPipeliningHighMark :: !Word16, -- ^ high threshold for pipelining (we will never exceed that many -- messages pipelined). - chainSyncPipeliningLowMark :: !Word16, + chainSyncPipeliningLowMark :: !Word16, -- ^ low threshold: if we hit the 'chainSyncPipeliningHighMark' we will -- listen for responses until there are at most -- 'chainSyncPipeliningLowMark' pipelined message @@ -150,19 +164,51 @@ data MiniProtocolParameters = MiniProtocolParameters { -- Note: 'chainSyncPipeliningLowMark' and 'chainSyncPipeliningLowMark' -- are passed to 'pipelineDecisionLowHighMark'. - blockFetchPipeliningMax :: !Word16, + blockFetchPipeliningMax :: !Word16, -- ^ maximal number of pipelined messages in 'block-fetch' mini-protocol. - txDecisionPolicy :: !TxDecisionPolicy + txDecisionPolicy :: !TxDecisionPolicy, -- ^ tx submission protocol decision logic parameters + + perasCertDiffusionMaxFifoLength :: !NumObjectsOutstanding, + -- ^ Maximum number of PerasCerts in the outbound peer's outstanding FIFO. + -- + -- This indirectly limits the number of pipelined requests from the inbound peer: + -- the inbound peer can only request @n@ new IDs if the execution of preceding + -- requests would result in at least @n@ empty seats in the FIFO. + -- + -- In the worst case: + -- + -- * The inbound peer requests IDs and objects one by one. + -- * The inbound peer is aware of @perasCertDiffusionMaxFifoLength@ IDs for objects + -- it hasn't requested yet (i.e., the FIFO is full). + -- + -- Then, the inbound peer can pipeline at most @perasCertDiffusionMaxFifoLength@ + -- requests for one object each (with a known ID), and up to + -- @perasCertDiffusionMaxFifoLength@ requests for one new ID each. + -- + -- So, the theoretical maximum pipeline size is + -- @2 * perasCertDiffusionMaxFifoLength@, but in practice the pipeline size will + -- be much smaller, as the inbound peer typically batches requests. + + perasVoteDiffusionMaxFifoLength :: !NumObjectsOutstanding + -- ^ Maximum number of PerasVotes in the outbound peer's outstanding FIFO. + -- See comment on 'perasCertDiffusionMaxFifoLength' for more details to + -- understand why this indirectly limits the number of pipelined requests. } defaultMiniProtocolParameters :: MiniProtocolParameters defaultMiniProtocolParameters = MiniProtocolParameters { - chainSyncPipeliningLowMark = 200 - , chainSyncPipeliningHighMark = 300 - , blockFetchPipeliningMax = 100 - , txDecisionPolicy = defaultTxDecisionPolicy + chainSyncPipeliningLowMark = 200 + , chainSyncPipeliningHighMark = 300 + , blockFetchPipeliningMax = 100 + , txDecisionPolicy = defaultTxDecisionPolicy + -- | TODO: this value is still being discussed. + -- See https://github.com/tweag/cardano-peras/issues/97 for reference. + , perasCertDiffusionMaxFifoLength = 10 + -- | TODO: this value is still being discussed. + -- See https://github.com/tweag/cardano-peras/issues/97 for reference. + , perasVoteDiffusionMaxFifoLength = 10_000 } -- | Make an 'OuroborosApplication' for the bundle of mini-protocols that @@ -184,15 +230,16 @@ defaultMiniProtocolParameters = MiniProtocolParameters { -- both protocols, e.g. wireshark plugins. -- nodeToNodeProtocols - :: MiniProtocolParameters + :: Set CardanoFeatureFlag + -> MiniProtocolParameters -> NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b -> NodeToNodeVersion -- ^ negotiated version number -> NodeToNodeVersionData -- ^ negotiated version data -> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b -nodeToNodeProtocols miniProtocolParameters protocols - _version NodeToNodeVersionData { peerSharing } +nodeToNodeProtocols featureFlags miniProtocolParameters protocols + version NodeToNodeVersionData { peerSharing } = TemperatureBundle -- Hot protocols: 'chain-sync', 'block-fetch' and 'tx-submission'. @@ -200,7 +247,9 @@ nodeToNodeProtocols miniProtocolParameters protocols case protocols of NodeToNodeProtocols { chainSyncProtocol, blockFetchProtocol, - txSubmissionProtocol + txSubmissionProtocol, + perasCertDiffusionProtocol, + perasVoteDiffusionProtocol } -> [ MiniProtocol { miniProtocolNum = chainSyncMiniProtocolNum, @@ -220,7 +269,23 @@ nodeToNodeProtocols miniProtocolParameters protocols miniProtocolLimits = txSubmissionProtocolLimits miniProtocolParameters, miniProtocolRun = txSubmissionProtocol } - ]) + ] + <> concat [perasMiniProtocols | isPerasEnabled featureFlags version] + where + perasMiniProtocols = + [ MiniProtocol { + miniProtocolNum = perasCertDiffusionMiniProtocolNum, + miniProtocolStart = StartOnDemand, + miniProtocolLimits = perasCertDiffusionProtocolLimits miniProtocolParameters, + miniProtocolRun = perasCertDiffusionProtocol + } + , MiniProtocol { + miniProtocolNum = perasVoteDiffusionMiniProtocolNum, + miniProtocolStart = StartOnDemand, + miniProtocolLimits = perasVoteDiffusionProtocolLimits miniProtocolParameters, + miniProtocolRun = perasVoteDiffusionProtocol + } + ]) -- Warm protocols: reserved for 'tip-sample'. (WithWarm []) @@ -256,7 +321,9 @@ chainSyncProtocolLimits , blockFetchProtocolLimits , txSubmissionProtocolLimits , keepAliveProtocolLimits - , peerSharingProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits + , peerSharingProtocolLimits + , perasCertDiffusionProtocolLimits + , perasVoteDiffusionProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits chainSyncProtocolLimits MiniProtocolParameters { chainSyncPipeliningHighMark } = MiniProtocolLimits { @@ -375,6 +442,26 @@ peerSharingProtocolLimits _ = maximumIngressQueue = 4 * 1440 } +perasCertDiffusionProtocolLimits MiniProtocolParameters { perasCertDiffusionMaxFifoLength } = + MiniProtocolLimits { + -- The reasoning here is very similar to the 'txSubmissionProtocolLimits'. + -- + -- Peras certificates will definitely be smaller than 20 kB; potentially + -- even much smaller. + -- See https://github.com/tweag/cardano-peras/issues/97 + maximumIngressQueue = addSafetyMargin $ + fromIntegral perasCertDiffusionMaxFifoLength * 20_000 + } + +perasVoteDiffusionProtocolLimits MiniProtocolParameters { perasVoteDiffusionMaxFifoLength } = + MiniProtocolLimits { + -- Peras votes are expected to be much smaller than Peras certificates. + -- We assume an upper bound of 1 kB per vote. + -- See https://github.com/tweag/cardano-peras/issues/97 + maximumIngressQueue = addSafetyMargin $ + fromIntegral perasVoteDiffusionMaxFifoLength * 1_000 + } + chainSyncMiniProtocolNum :: MiniProtocolNum chainSyncMiniProtocolNum = MiniProtocolNum 2 @@ -390,6 +477,12 @@ keepAliveMiniProtocolNum = MiniProtocolNum 8 peerSharingMiniProtocolNum :: MiniProtocolNum peerSharingMiniProtocolNum = MiniProtocolNum 10 +perasCertDiffusionMiniProtocolNum :: MiniProtocolNum +perasCertDiffusionMiniProtocolNum = MiniProtocolNum 16 + +perasVoteDiffusionMiniProtocolNum :: MiniProtocolNum +perasVoteDiffusionMiniProtocolNum = MiniProtocolNum 17 + -- | A specialised version of @'Ouroboros.Network.Socket.connectToNode'@. -- connectTo diff --git a/ouroboros-network/changelog.d/20250929_094106_alexander.esgen_peras_diffusion.md b/ouroboros-network/changelog.d/20250929_094106_alexander.esgen_peras_diffusion.md new file mode 100644 index 00000000000..e96a3847efc --- /dev/null +++ b/ouroboros-network/changelog.d/20250929_094106_alexander.esgen_peras_diffusion.md @@ -0,0 +1,11 @@ +### Breaking + +- `Cardano.Network.NodeToNode`: Added support for certificate and vote diffusion mini-protocols for Ouroboros Peras. + - New fields in `NodeToNodeProtocols`. + - Protocol limits `perasCertDiffusionProtocolLimits`/`perasCertVoteProtocolLimits`. + - Mini-protocol numbers `perasCertDiffusionMiniProtocolNum`/`perasVoteDiffusionMiniProtocolNum`. + - `nodeToNodeProtocols` now takes a new `Set FeatureFlag` argument to control experimental features like Ouroboros Peras. + +### Non-Breaking + +- `Cardano.Network.OrphanInstances`: Added support for `NodeToNodeV_16`. diff --git a/ouroboros-network/orphan-instances/Cardano/Network/OrphanInstances.hs b/ouroboros-network/orphan-instances/Cardano/Network/OrphanInstances.hs index 8f1fd702d55..1b5294c4a44 100644 --- a/ouroboros-network/orphan-instances/Cardano/Network/OrphanInstances.hs +++ b/ouroboros-network/orphan-instances/Cardano/Network/OrphanInstances.hs @@ -68,12 +68,14 @@ instance FromJSON NodeToNodeVersion where parseJSON = \case Number 14 -> pure NodeToNodeV_14 Number 15 -> pure NodeToNodeV_15 + Number 16 -> pure NodeToNodeV_16 Number x -> fail $ "FromJSON.NodeToNodeVersion: unsupported node-to-node protocol version " ++ show x x -> fail $ "FromJSON.NodeToNodeVersion: error parsing NodeToNodeVersion: " ++ show x instance ToJSON NodeToNodeVersion where toJSON NodeToNodeV_14 = Number 14 toJSON NodeToNodeV_15 = Number 15 + toJSON NodeToNodeV_16 = Number 16 instance FromJSON NodeToClientVersion where parseJSON = \case diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index 694b832bacb..83e06a8c453 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -282,6 +282,7 @@ library cardano-diffusion build-depends: base >=4.14 && <4.22, bytestring, + cardano-base, containers, contra-tracer, dns,