Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
fb5f0b4
Initial commit with copy-pasted TxSubmissionV2 for ObjectDiffusionV2
tbagrel1 Sep 29, 2025
e8458cb
Some refactor
tbagrel1 Sep 29, 2025
b225208
WIP: further refactor
tbagrel1 Sep 29, 2025
511ed4f
wipper
nbacquey Oct 1, 2025
6ee28bf
Finish most refactor on Types.hs
tbagrel1 Oct 3, 2025
94dc009
Fix some more errors
tbagrel1 Oct 3, 2025
2a899e9
Continue re-organizing decision impl
tbagrel1 Oct 6, 2025
fed9f0d
WIP before removing dgsObjectsPending
tbagrel1 Oct 7, 2025
dac65af
WIP: cleaning State.hs file
tbagrel1 Oct 7, 2025
8775a5d
more work on State.hs
nbacquey Oct 8, 2025
3cd2508
clean handleReceivedObjectsImpl
nbacquey Oct 8, 2025
128b6a1
First diagram attempt
tbagrel1 Oct 9, 2025
36ab9e9
Continue on handleReceivedObjectsImpl
tbagrel1 Oct 9, 2025
ffe6cf4
Stabilize State.hs, plumbing in Registry.hs
nbacquey Oct 9, 2025
cd4c6a6
Further update state.hs
tbagrel1 Oct 10, 2025
70a793f
Improve state management and diagram
tbagrel1 Oct 10, 2025
553c767
Update state and diagram following removal of `dgsObjectsLiveMultipli…
tbagrel1 Oct 13, 2025
56ee864
Update state management (again)
tbagrel1 Oct 13, 2025
ffb9f39
Remove objectsPending field
tbagrel1 Oct 13, 2025
cc30a91
Simplify pickObjectsToReq logic
tbagrel1 Oct 13, 2025
2377db9
clean up code in decision process
tbagrel1 Oct 14, 2025
e1a5987
Finalize new decision logic
tbagrel1 Oct 14, 2025
70feb95
formatting
tbagrel1 Oct 14, 2025
b7ef54c
Further polishing
tbagrel1 Oct 14, 2025
629b1b2
Remove useless function
tbagrel1 Oct 14, 2025
3ae6dc1
Remove `StdGen` from global state
nbacquey Oct 14, 2025
513291c
Make decision only for peers that haven't read their decision yet
tbagrel1 Oct 15, 2025
8c4094a
WIP V2.hs
tbagrel1 Oct 15, 2025
2b3cdd5
First version that builds! Youpiii!
tbagrel1 Oct 15, 2025
8bd6007
Fix formatting
tbagrel1 Oct 15, 2025
d6aaf0c
Update s-r-p to point on latest ouroboros-network/peras-staging
tbagrel1 Oct 20, 2025
aa34feb
Move V2.mermaid to V2.md
tbagrel1 Oct 20, 2025
4e145a3
WIP: documentation effort on registry and state
tbagrel1 Oct 20, 2025
f794d6f
Add failures for protocol errors and implementation errors
nbacquey Oct 20, 2025
b677b71
Check that received IDs are not already in pool
nbacquey Oct 21, 2025
da01ff2
Formatting and updated comments
nbacquey Oct 21, 2025
b86f0b2
Futher Documentation changes
tbagrel1 Oct 22, 2025
919796a
"Fix" mermaid diagram
nbacquey Oct 22, 2025
87e8ee6
More documentation and fixes
nbacquey Oct 22, 2025
fab43c7
Further edits to doc
tbagrel1 Oct 22, 2025
ff925ad
Protocol implem design document is ready!
tbagrel1 Oct 22, 2025
7fa719c
WIP: benchmarks for object diffusion logic
nbacquey Oct 23, 2025
987f215
Continue working on benchmark gen for ObjectDiffusionV2/makeDecisions
tbagrel1 Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ allow-newer:
source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: peras-staging/pr-5202
--sha256: sha256-nTbjunQaqt6/syzSKw24Lne50083dI2SZFirG2/1T9U=
tag: 0db8669b67982cba755e80bf2e413527def41244
--sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ=
subdir:
ouroboros-network
ouroboros-network-protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 (objectDiffusionInbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundStateView
, bracketObjectDiffusionInbound
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State
, ChainSyncClientHandleCollection (..)
, ChainSyncState (..)
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundHandle (..)
, ObjectDiffusionInboundHandleCollection (..)
, ObjectDiffusionInboundState (..)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
( SomeHeaderInFutureCheck
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundHandleCollection (..)
, newObjectDiffusionInboundHandleCollection
)
Expand Down
93 changes: 93 additions & 0 deletions ouroboros-consensus/bench/ObjectDiffusion-bench/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE TypeApplications #-}

-- | This module contains benchmarks for Peras Object diffusion decision logic
-- as implemented by the by the function
-- 'Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision.makeDecision'
module Main (main) where

import Control.DeepSeq (NFData (..))
import Control.Exception (evaluate)
import Data.Hashable (Hashable)
import Debug.Trace (traceMarkerIO)
import GHC.Generics (Generic)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision qualified as OD
import System.Random.SplitMix qualified as SM
import Test.QuickCheck (Arbitrary (..))
import Test.Tasty.Bench

-- TODO: We will probably want to use the actual types used in vote/cert diffusion,
-- instead of placeholders.
newtype DummyPeerAddr = DummyPeerAddr Int
deriving (Eq, Ord, Generic, NFData)

instance Arbitrary DummyPeerAddr where
arbitrary = DummyPeerAddr <$> arbitrary

newtype DummyObjectId = DummyObjectId Int
deriving (Eq, Ord, Generic, Hashable, NFData)

instance Arbitrary DummyObjectId where
arbitrary = DummyObjectId <$> arbitrary

data DummyObject = DummyObject
{ doId :: DummyObjectId
, doPayload :: ()
}
deriving (Generic, NFData)

instance Arbitrary DummyObject where
arbitrary = DummyObject <$> arbitrary <*> arbitrary

main :: IO ()
main =
defaultMain
[ bgroup
"ouroboros-consensus:ObjectDiffusion"
[ bgroup
"VoteDiffusion"
[ env
( do
let a = OD.mkDecisionContext (SM.mkSMGen 123) 10 Nothing
evaluate (rnf a)
traceMarkerIO "evaluated decision context"
return a
)
( \a ->
bench "makeDecisions: 10" $
nf makeVoteDiffusionDecision a
)
, env
( do
let a = OD.mkDecisionContext (SM.mkSMGen 456) 100 Nothing
evaluate (rnf a)
traceMarkerIO "evaluated decision context"
return a
)
( \a ->
bench "makeDecisions: 100" $
nf makeVoteDiffusionDecision a
)
, env
( do
let a = OD.mkDecisionContext (SM.mkSMGen 789) 1_000 Nothing
evaluate (rnf a)
traceMarkerIO "evaluated decision context"
return a
)
( \a ->
bench "makeDecisions: 1_000" $
nf makeVoteDiffusionDecision a
)
]
, bgroup "CertDiffusion" []
]
]
where
-- TODO: We probably want to use the decision policy for vote/cert diffusion
-- instead of an arbitrary one.
makeVoteDiffusionDecision = \decisionContext -> OD.makeDecisions @DummyPeerAddr @DummyObjectId @DummyObject decisionContext
28 changes: 26 additions & 2 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,14 @@ library
Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server
Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server
Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound
Expand Down Expand Up @@ -353,7 +359,9 @@ library
ouroboros-network-protocols ^>=0.15,
primitive,
psqueues ^>=0.2.3,
QuickCheck,
quiet ^>=0.2,
random,
rawlock ^>=0.1.1,
resource-registry ^>=0.1,
semialign >=1.1,
Expand All @@ -362,6 +370,7 @@ library
small-steps ^>=1.1,
sop-core ^>=0.5,
sop-extras ^>=0.4,
splitmix,
streaming,
strict >=0.1 && <0.6,
strict-checked-vars ^>=0.2,
Expand Down Expand Up @@ -860,6 +869,21 @@ benchmark PerasCertDB-bench
tasty-bench,
unstable-consensus-testlib,

benchmark ObjectDiffusion-bench
import: common-bench
type: exitcode-stdio-1.0
hs-source-dirs: bench/ObjectDiffusion-bench
main-is: Main.hs
other-modules:
build-depends:
base,
deepseq,
hashable,
ouroboros-consensus,
QuickCheck,
splitmix,
tasty-bench,

test-suite doctest
import: common-test
main-is: doctest.hs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1
( objectDiffusionInbound
, TraceObjectDiffusionInbound (..)
, ObjectDiffusionInboundError (..)
Expand All @@ -38,7 +38,7 @@ import Data.Word (Word64)
import GHC.Generics (Generic)
import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt)
import NoThunks.Class (NoThunks (..), unsafeNoThunks)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundStateView (..)
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
Expand Down Expand Up @@ -146,7 +146,7 @@ objectDiffusionInbound
_version
controlMessageSTM
state =
ObjectDiffusionInboundPipelined $ do
ObjectDiffusionInboundPipelined $
continueWithStateM (go Zero) initialInboundSt
where
canRequestMoreObjects :: InboundSt k object -> Bool
Expand Down Expand Up @@ -320,9 +320,10 @@ objectDiffusionInbound
-- request.
let st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested}
poolHasObject <- atomically $ opwHasObject
continueWithStateM
(go n)
(preAcknowledge st' poolHasObject collectedIds)
pure $
continueWithStateM
(go n)
(preAcknowledge st' poolHasObject collectedIds)
CollectObjects requestedIds collectedObjects -> do
let requestedIdsSet = Set.fromList requestedIds
obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects)
Expand Down Expand Up @@ -368,15 +369,16 @@ objectDiffusionInbound
traceWith tracer $
TraceObjectDiffusionProcessed
(NumObjectsProcessed (fromIntegral $ length objectsToAck))
continueWithStateM
(go n)
st
{ pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo'
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}
pure $
continueWithStateM
(go n)
st
{ pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo'
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}

goReqObjectIdsBlocking :: Stateful (InboundSt objectId object) 'Z objectId object m
goReqObjectIdsBlocking = Stateful $ \st -> do
Expand All @@ -392,20 +394,21 @@ objectDiffusionInbound
$ SendMsgRequestObjectIdsBlocking
(numToAckOnNextReq st)
numIdsToRequest
( \neCollectedIds -> do
( \neCollectedIds -> WithEffect $ do
-- We just got some new object id's, so we are no longer idling
--
-- NOTE this change of state should be made explicit:
-- https://github.com/tweag/cardano-peras/issues/144
Idling.idlingStop (odisvIdling state)
traceWith tracer TraceObjectInboundStoppedIdling
collectAndContinueWithState
(goCollect Zero)
st
{ numToAckOnNextReq = 0
, numIdsInFlight = numIdsToRequest
}
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
pure $
collectAndContinueWithState
(goCollect Zero)
st
{ numToAckOnNextReq = 0
, numIdsInFlight = numIdsToRequest
}
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
)

goReqObjectsAndObjectIdsPipelined ::
Expand Down Expand Up @@ -433,7 +436,7 @@ objectDiffusionInbound
let numIdsToRequest = numIdsToReq st

if numIdsToRequest <= 0
then continueWithStateM (go n) st
then pure $ continueWithStateM (go n) st
else
pure $
SendMsgRequestObjectIdsPipelined
Expand All @@ -454,8 +457,8 @@ objectDiffusionInbound
terminateAfterDrain ::
Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain = \case
Zero -> SendMsgDone (pure ())
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> pure $ terminateAfterDrain n
Zero -> SendMsgDone ()
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n

-------------------------------------------------------------------------------
-- Utilities to deal with stateful continuations (copied from TX-submission)
Expand Down Expand Up @@ -487,9 +490,9 @@ continueWithStateM ::
NoThunks s =>
StatefulM s n objectId object m ->
s ->
m (InboundStIdle n objectId object m ())
InboundStIdle n objectId object m ()
continueWithStateM (StatefulM f) !st =
checkInvariant (show <$> unsafeNoThunks st) (f st)
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st)
{-# NOINLINE continueWithStateM #-}

-- | A variant of 'continueWithState' to be more easily utilized with
Expand All @@ -499,7 +502,7 @@ collectAndContinueWithState ::
StatefulCollect s n objectId object m ->
s ->
Collect objectId object ->
m (InboundStIdle n objectId object m ())
InboundStIdle n objectId object m ()
collectAndContinueWithState (StatefulCollect f) !st c =
checkInvariant (show <$> unsafeNoThunks st) (f st c)
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st c)
{-# NOINLINE collectAndContinueWithState #-}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
( ObjectDiffusionInboundState (..)
, initObjectDiffusionInboundState
, ObjectDiffusionInboundHandle (..)
Expand Down
Loading
Loading