Skip to content
Draft
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
fb5f0b4
Initial commit with copy-pasted TxSubmissionV2 for ObjectDiffusionV2
tbagrel1 Sep 29, 2025
e8458cb
Some refactor
tbagrel1 Sep 29, 2025
b225208
WIP: further refactor
tbagrel1 Sep 29, 2025
511ed4f
wipper
nbacquey Oct 1, 2025
6ee28bf
Finish most refactor on Types.hs
tbagrel1 Oct 3, 2025
94dc009
Fix some more errors
tbagrel1 Oct 3, 2025
2a899e9
Continue re-organizing decision impl
tbagrel1 Oct 6, 2025
fed9f0d
WIP before removing dgsObjectsPending
tbagrel1 Oct 7, 2025
dac65af
WIP: cleaning State.hs file
tbagrel1 Oct 7, 2025
8775a5d
more work on State.hs
nbacquey Oct 8, 2025
3cd2508
clean handleReceivedObjectsImpl
nbacquey Oct 8, 2025
128b6a1
First diagram attempt
tbagrel1 Oct 9, 2025
36ab9e9
Continue on handleReceivedObjectsImpl
tbagrel1 Oct 9, 2025
ffe6cf4
Stabilize State.hs, plumbing in Registry.hs
nbacquey Oct 9, 2025
cd4c6a6
Further update state.hs
tbagrel1 Oct 10, 2025
70a793f
Improve state management and diagram
tbagrel1 Oct 10, 2025
553c767
Update state and diagram following removal of `dgsObjectsLiveMultipli…
tbagrel1 Oct 13, 2025
56ee864
Update state management (again)
tbagrel1 Oct 13, 2025
ffb9f39
Remove objectsPending field
tbagrel1 Oct 13, 2025
cc30a91
Simplify pickObjectsToReq logic
tbagrel1 Oct 13, 2025
2377db9
clean up code in decision process
tbagrel1 Oct 14, 2025
e1a5987
Finalize new decision logic
tbagrel1 Oct 14, 2025
70feb95
formatting
tbagrel1 Oct 14, 2025
b7ef54c
Further polishing
tbagrel1 Oct 14, 2025
629b1b2
Remove useless function
tbagrel1 Oct 14, 2025
3ae6dc1
Remove `StdGen` from global state
nbacquey Oct 14, 2025
513291c
Make decision only for peers that haven't read their decision yet
tbagrel1 Oct 15, 2025
8c4094a
WIP V2.hs
tbagrel1 Oct 15, 2025
2b3cdd5
First version that builds! Youpiii!
tbagrel1 Oct 15, 2025
8bd6007
Fix formatting
tbagrel1 Oct 15, 2025
d6aaf0c
Update s-r-p to point on latest ouroboros-network/peras-staging
tbagrel1 Oct 20, 2025
aa34feb
Move V2.mermaid to V2.md
tbagrel1 Oct 20, 2025
4e145a3
WIP: documentation effort on registry and state
tbagrel1 Oct 20, 2025
f794d6f
Add failures for protocol errors and implementation errors
nbacquey Oct 20, 2025
b677b71
Check that received IDs are not already in pool
nbacquey Oct 21, 2025
da01ff2
Formatting and updated comments
nbacquey Oct 21, 2025
b86f0b2
Futher Documentation changes
tbagrel1 Oct 22, 2025
919796a
"Fix" mermaid diagram
nbacquey Oct 22, 2025
87e8ee6
More documentation and fixes
nbacquey Oct 22, 2025
fab43c7
Further edits to doc
tbagrel1 Oct 22, 2025
ff925ad
Protocol implem design document is ready!
tbagrel1 Oct 22, 2025
7fa719c
WIP: benchmarks for object diffusion logic
nbacquey Oct 23, 2025
987f215
Continue working on benchmark gen for ObjectDiffusionV2/makeDecisions
tbagrel1 Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ allow-newer:
source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: peras-staging/pr-5202
--sha256: sha256-nTbjunQaqt6/syzSKw24Lne50083dI2SZFirG2/1T9U=
tag: 0db8669b67982cba755e80bf2e413527def41244
--sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ=
subdir:
ouroboros-network
ouroboros-network-protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 (objectDiffusionInbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundStateView
, bracketObjectDiffusionInbound
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State
, ChainSyncClientHandleCollection (..)
, ChainSyncState (..)
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundHandle (..)
, ObjectDiffusionInboundHandleCollection (..)
, ObjectDiffusionInboundState (..)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
( SomeHeaderInFutureCheck
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundHandleCollection (..)
, newObjectDiffusionInboundHandleCollection
)
Expand Down
11 changes: 9 additions & 2 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,14 @@ library
Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server
Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server
Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound
Expand Down Expand Up @@ -354,6 +360,7 @@ library
primitive,
psqueues ^>=0.2.3,
quiet ^>=0.2,
random,
rawlock ^>=0.1.1,
resource-registry ^>=0.1,
semialign >=1.1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1
( objectDiffusionInbound
, TraceObjectDiffusionInbound (..)
, ObjectDiffusionInboundError (..)
Expand All @@ -38,7 +38,7 @@ import Data.Word (Word64)
import GHC.Generics (Generic)
import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt)
import NoThunks.Class (NoThunks (..), unsafeNoThunks)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundStateView (..)
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
Expand Down Expand Up @@ -146,7 +146,7 @@ objectDiffusionInbound
_version
controlMessageSTM
state =
ObjectDiffusionInboundPipelined $ do
ObjectDiffusionInboundPipelined $
continueWithStateM (go Zero) initialInboundSt
where
canRequestMoreObjects :: InboundSt k object -> Bool
Expand Down Expand Up @@ -320,9 +320,10 @@ objectDiffusionInbound
-- request.
let st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested}
poolHasObject <- atomically $ opwHasObject
continueWithStateM
(go n)
(preAcknowledge st' poolHasObject collectedIds)
pure $
continueWithStateM
(go n)
(preAcknowledge st' poolHasObject collectedIds)
CollectObjects requestedIds collectedObjects -> do
let requestedIdsSet = Set.fromList requestedIds
obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects)
Expand Down Expand Up @@ -368,15 +369,16 @@ objectDiffusionInbound
traceWith tracer $
TraceObjectDiffusionProcessed
(NumObjectsProcessed (fromIntegral $ length objectsToAck))
continueWithStateM
(go n)
st
{ pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo'
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}
pure $
continueWithStateM
(go n)
st
{ pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo'
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}

goReqObjectIdsBlocking :: Stateful (InboundSt objectId object) 'Z objectId object m
goReqObjectIdsBlocking = Stateful $ \st -> do
Expand All @@ -392,20 +394,21 @@ objectDiffusionInbound
$ SendMsgRequestObjectIdsBlocking
(numToAckOnNextReq st)
numIdsToRequest
( \neCollectedIds -> do
( \neCollectedIds -> WithEffect $ do
-- We just got some new object id's, so we are no longer idling
--
-- NOTE this change of state should be made explicit:
-- https://github.com/tweag/cardano-peras/issues/144
Idling.idlingStop (odisvIdling state)
traceWith tracer TraceObjectInboundStoppedIdling
collectAndContinueWithState
(goCollect Zero)
st
{ numToAckOnNextReq = 0
, numIdsInFlight = numIdsToRequest
}
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
pure $
collectAndContinueWithState
(goCollect Zero)
st
{ numToAckOnNextReq = 0
, numIdsInFlight = numIdsToRequest
}
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
)

goReqObjectsAndObjectIdsPipelined ::
Expand Down Expand Up @@ -433,7 +436,7 @@ objectDiffusionInbound
let numIdsToRequest = numIdsToReq st

if numIdsToRequest <= 0
then continueWithStateM (go n) st
then pure $ continueWithStateM (go n) st
else
pure $
SendMsgRequestObjectIdsPipelined
Expand All @@ -454,8 +457,8 @@ objectDiffusionInbound
terminateAfterDrain ::
Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain = \case
Zero -> SendMsgDone (pure ())
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> pure $ terminateAfterDrain n
Zero -> SendMsgDone ()
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n

-------------------------------------------------------------------------------
-- Utilities to deal with stateful continuations (copied from TX-submission)
Expand Down Expand Up @@ -487,9 +490,9 @@ continueWithStateM ::
NoThunks s =>
StatefulM s n objectId object m ->
s ->
m (InboundStIdle n objectId object m ())
InboundStIdle n objectId object m ()
continueWithStateM (StatefulM f) !st =
checkInvariant (show <$> unsafeNoThunks st) (f st)
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st)
{-# NOINLINE continueWithStateM #-}

-- | A variant of 'continueWithState' to be more easily utilized with
Expand All @@ -499,7 +502,7 @@ collectAndContinueWithState ::
StatefulCollect s n objectId object m ->
s ->
Collect objectId object ->
m (InboundStIdle n objectId object m ())
InboundStIdle n objectId object m ()
collectAndContinueWithState (StatefulCollect f) !st c =
checkInvariant (show <$> unsafeNoThunks st) (f st c)
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st c)
{-# NOINLINE collectAndContinueWithState #-}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundState (..)
, initObjectDiffusionInboundState
, ObjectDiffusionInboundHandle (..)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2
( -- * ObjectDiffusion Inbound client
objectDiffusionInbound

-- * PeerStateAPI
, withPeer
, PeerStateAPI

-- * Supporting types
, module V2
, PeerDecisionChannelsVar
, newPeerDecisionChannelsVar
, DecisionPolicy (..)
, defaultDecisionPolicy
) where

import Control.Concurrent.Class.MonadSTM (MonadSTM, atomically)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Set qualified as Set
import Network.TypedProtocol
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types as V2
import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound

-- TODO: Add checks and validation

-- | A object-diffusion inbound side (client).
--
-- The steps are as follow
-- 1. Block on next decision from the decision logic
-- 2. Handle any available reply (`goCollect`)
-- 3. Request new objects if possible (`goReqObjects`)
-- 4. Request new ids (also responsible for ack) (`goReqIds`)
-- 5. signal psaOnDecisionCompleted (as part of `goReqIds{Blocking,NonBlocking}`)
-- And loop again
-- We need to make sure we don't go again into `goIdle` until `psaOnDecisionCompleted` has been called
objectDiffusionInbound ::
forall objectId object m.
( MonadThrow m
, MonadSTM m
) =>
Tracer m (TraceObjectDiffusionInbound objectId object) ->
ControlMessageSTM m ->
PeerStateAPI m objectId object ->
ObjectDiffusionInboundPipelined objectId object m ()
objectDiffusionInbound
tracer
controlMessageSTM
PeerStateAPI
{ psaReadDecision
, psaOnDecisionCompleted
, psaOnRequestIds
, psaOnRequestObjects
, psaOnReceiveIds
, psaOnReceiveObjects
} =
ObjectDiffusionInboundPipelined $ goIdle Zero
where
goIdle :: forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
goIdle n = WithEffect $ do
ctrlMsg <- atomically controlMessageSTM
traceWith tracer $ TraceObjectDiffusionInboundReceivedControlMessage ctrlMsg
case ctrlMsg of
-- The peer selection governor is asking us to terminate the connection.
Terminate ->
pure $ terminateAfterDrain n
-- Otherwise, we can continue the protocol normally.
_continue -> do
-- Block on next decision.
decision <- psaReadDecision
traceWith tracer (TraceObjectDiffusionInboundReceivedDecision decision)
pure $ goCollect n decision

terminateAfterDrain ::
Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain = \case
Zero -> SendMsgDone ()
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n

goCollect :: Nat n -> PeerDecision objectId object -> InboundStIdle n objectId object m ()
goCollect Zero decision =
goReqObjects Zero decision
goCollect (Succ n) decision =
CollectPipelined
(Just $ goReqObjects (Succ n) decision)
( \case
CollectObjectIds numIdsRequested ids -> WithEffect $ do
-- TODO: Add checks and validation
psaOnReceiveIds numIdsRequested ids
pure $ goCollect n decision
CollectObjects _objectIds objects -> WithEffect $ do
-- TODO: Add checks and validation
psaOnReceiveObjects objects
pure $ goCollect n decision
)

goReqObjects ::
Nat n ->
PeerDecision objectId object ->
InboundStIdle n objectId object m ()
goReqObjects n object@PeerDecision{pdObjectsToReqIds} =
if Set.null pdObjectsToReqIds
then
goReqIds n object
else WithEffect $ do
psaOnRequestObjects pdObjectsToReqIds
pure $
SendMsgRequestObjectsPipelined
(Set.toList pdObjectsToReqIds)
(goReqIds (Succ n) object)

goReqIds ::
forall (n :: N).
Nat n ->
PeerDecision objectId object ->
InboundStIdle n objectId object m ()
goReqIds n pd@PeerDecision{pdCanPipelineIdsRequests} =
if pdCanPipelineIdsRequests
then goReqIdsPipelined n pd
else case n of
Zero -> goReqIdsBlocking pd
Succ{} -> error "Impossible to have pipelined requests when we have no known unacknowledged objectIds"

goReqIdsBlocking ::
PeerDecision objectId object ->
InboundStIdle Z objectId object m ()
goReqIdsBlocking PeerDecision{pdNumIdsToAck, pdNumIdsToReq} = WithEffect $ do
if pdNumIdsToReq == 0
then do
psaOnDecisionCompleted
pure $ goIdle Zero
else do
psaOnRequestIds pdNumIdsToAck pdNumIdsToReq
psaOnDecisionCompleted
pure $
SendMsgRequestObjectIdsBlocking
pdNumIdsToAck
pdNumIdsToReq
( \objectIds -> WithEffect $ do
-- TODO: Add checks and validation
psaOnReceiveIds pdNumIdsToReq (NonEmpty.toList objectIds)
pure $ goIdle Zero
)

goReqIdsPipelined ::
forall (n :: N).
Nat n ->
PeerDecision objectId object ->
InboundStIdle n objectId object m ()
goReqIdsPipelined n PeerDecision{pdNumIdsToAck, pdNumIdsToReq} = WithEffect $ do
if pdNumIdsToReq == 0
then do
psaOnDecisionCompleted
pure $ goIdle n
else do
psaOnRequestIds pdNumIdsToAck pdNumIdsToReq
psaOnDecisionCompleted
pure $
SendMsgRequestObjectIdsPipelined
pdNumIdsToAck
pdNumIdsToReq
(goIdle (Succ n))
Loading
Loading