diff --git a/cabal.project b/cabal.project index 17c612c568..6fa991395e 100644 --- a/cabal.project +++ b/cabal.project @@ -59,8 +59,8 @@ allow-newer: source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: peras-staging/pr-5202 - --sha256: sha256-nTbjunQaqt6/syzSKw24Lne50083dI2SZFirG2/1T9U= + tag: 0db8669b67982cba755e80bf2e413527def41244 + --sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ= subdir: ouroboros-network ouroboros-network-protocols diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index 8b8f27b7b2..494faa0505 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -70,8 +70,8 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client ) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient import Ouroboros.Consensus.MiniProtocol.ChainSync.Server -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 (objectDiffusionInbound) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State ( ObjectDiffusionInboundStateView , bracketObjectDiffusionInbound ) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM/PeerState.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM/PeerState.hs index defc3abe33..ce092dad15 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM/PeerState.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM/PeerState.hs @@ -17,7 +17,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State , ChainSyncClientHandleCollection (..) , ChainSyncState (..) ) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State ( ObjectDiffusionInboundHandle (..) , ObjectDiffusionInboundHandleCollection (..) , ObjectDiffusionInboundState (..) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index cff137e5a6..251107e713 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -82,7 +82,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck ( SomeHeaderInFutureCheck ) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State ( ObjectDiffusionInboundHandleCollection (..) , newObjectDiffusionInboundHandleCollection ) diff --git a/ouroboros-consensus/bench/ObjectDiffusion-bench/Main.hs b/ouroboros-consensus/bench/ObjectDiffusion-bench/Main.hs new file mode 100644 index 0000000000..963f3c026a --- /dev/null +++ b/ouroboros-consensus/bench/ObjectDiffusion-bench/Main.hs @@ -0,0 +1,93 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE TypeApplications #-} + +-- | This module contains benchmarks for Peras Object diffusion decision logic +-- as implemented by the by the function +-- 'Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision.makeDecision' +module Main (main) where + +import Control.DeepSeq (NFData (..)) +import Control.Exception (evaluate) +import Data.Hashable (Hashable) +import Debug.Trace (traceMarkerIO) +import GHC.Generics (Generic) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision qualified as OD +import System.Random.SplitMix qualified as SM +import Test.QuickCheck (Arbitrary (..)) +import Test.Tasty.Bench + +-- TODO: We will probably want to use the actual types used in vote/cert diffusion, +-- instead of placeholders. +newtype DummyPeerAddr = DummyPeerAddr Int + deriving (Eq, Ord, Generic, NFData) + +instance Arbitrary DummyPeerAddr where + arbitrary = DummyPeerAddr <$> arbitrary + +newtype DummyObjectId = DummyObjectId Int + deriving (Eq, Ord, Generic, Hashable, NFData) + +instance Arbitrary DummyObjectId where + arbitrary = DummyObjectId <$> arbitrary + +data DummyObject = DummyObject + { doId :: DummyObjectId + , doPayload :: () + } + deriving (Generic, NFData) + +instance Arbitrary DummyObject where + arbitrary = DummyObject <$> arbitrary <*> arbitrary + +main :: IO () +main = + defaultMain + [ bgroup + "ouroboros-consensus:ObjectDiffusion" + [ bgroup + "VoteDiffusion" + [ env + ( do + let a = OD.mkDecisionContext (SM.mkSMGen 123) 10 Nothing + evaluate (rnf a) + traceMarkerIO "evaluated decision context" + return a + ) + ( \a -> + bench "makeDecisions: 10" $ + nf makeVoteDiffusionDecision a + ) + , env + ( do + let a = OD.mkDecisionContext (SM.mkSMGen 456) 100 Nothing + evaluate (rnf a) + traceMarkerIO "evaluated decision context" + return a + ) + ( \a -> + bench "makeDecisions: 100" $ + nf makeVoteDiffusionDecision a + ) + , env + ( do + let a = OD.mkDecisionContext (SM.mkSMGen 789) 1_000 Nothing + evaluate (rnf a) + traceMarkerIO "evaluated decision context" + return a + ) + ( \a -> + bench "makeDecisions: 1_000" $ + nf makeVoteDiffusionDecision a + ) + ] + , bgroup "CertDiffusion" [] + ] + ] + where + -- TODO: We probably want to use the decision policy for vote/cert diffusion + -- instead of an arbitrary one. + makeVoteDiffusionDecision = \decisionContext -> OD.makeDecisions @DummyPeerAddr @DummyObjectId @DummyObject decisionContext diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index acbca582c1..283166c5a5 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -191,8 +191,14 @@ library Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server - Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound - Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2 + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound @@ -353,7 +359,9 @@ library ouroboros-network-protocols ^>=0.15, primitive, psqueues ^>=0.2.3, + QuickCheck, quiet ^>=0.2, + random, rawlock ^>=0.1.1, resource-registry ^>=0.1, semialign >=1.1, @@ -362,6 +370,7 @@ library small-steps ^>=1.1, sop-core ^>=0.5, sop-extras ^>=0.4, + splitmix, streaming, strict >=0.1 && <0.6, strict-checked-vars ^>=0.2, @@ -860,6 +869,21 @@ benchmark PerasCertDB-bench tasty-bench, unstable-consensus-testlib, +benchmark ObjectDiffusion-bench + import: common-bench + type: exitcode-stdio-1.0 + hs-source-dirs: bench/ObjectDiffusion-bench + main-is: Main.hs + other-modules: + build-depends: + base, + deepseq, + hashable, + ouroboros-consensus, + QuickCheck, + splitmix, + tasty-bench, + test-suite doctest import: common-test main-is: doctest.hs diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V1.hs similarity index 93% rename from ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs rename to ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V1.hs index a368682c40..8ca2041356 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V1.hs @@ -11,7 +11,7 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} -module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 ( objectDiffusionInbound , TraceObjectDiffusionInbound (..) , ObjectDiffusionInboundError (..) @@ -38,7 +38,7 @@ import Data.Word (Word64) import GHC.Generics (Generic) import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt) import NoThunks.Class (NoThunks (..), unsafeNoThunks) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State ( ObjectDiffusionInboundStateView (..) ) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API @@ -146,7 +146,7 @@ objectDiffusionInbound _version controlMessageSTM state = - ObjectDiffusionInboundPipelined $ do + ObjectDiffusionInboundPipelined $ continueWithStateM (go Zero) initialInboundSt where canRequestMoreObjects :: InboundSt k object -> Bool @@ -320,9 +320,10 @@ objectDiffusionInbound -- request. let st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested} poolHasObject <- atomically $ opwHasObject - continueWithStateM - (go n) - (preAcknowledge st' poolHasObject collectedIds) + pure $ + continueWithStateM + (go n) + (preAcknowledge st' poolHasObject collectedIds) CollectObjects requestedIds collectedObjects -> do let requestedIdsSet = Set.fromList requestedIds obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects) @@ -368,15 +369,16 @@ objectDiffusionInbound traceWith tracer $ TraceObjectDiffusionProcessed (NumObjectsProcessed (fromIntegral $ length objectsToAck)) - continueWithStateM - (go n) - st - { pendingObjects = pendingObjects'' - , outstandingFifo = outstandingFifo' - , numToAckOnNextReq = - numToAckOnNextReq st - + fromIntegral (Seq.length objectIdsToAck) - } + pure $ + continueWithStateM + (go n) + st + { pendingObjects = pendingObjects'' + , outstandingFifo = outstandingFifo' + , numToAckOnNextReq = + numToAckOnNextReq st + + fromIntegral (Seq.length objectIdsToAck) + } goReqObjectIdsBlocking :: Stateful (InboundSt objectId object) 'Z objectId object m goReqObjectIdsBlocking = Stateful $ \st -> do @@ -392,20 +394,21 @@ objectDiffusionInbound $ SendMsgRequestObjectIdsBlocking (numToAckOnNextReq st) numIdsToRequest - ( \neCollectedIds -> do + ( \neCollectedIds -> WithEffect $ do -- We just got some new object id's, so we are no longer idling -- -- NOTE this change of state should be made explicit: -- https://github.com/tweag/cardano-peras/issues/144 Idling.idlingStop (odisvIdling state) traceWith tracer TraceObjectInboundStoppedIdling - collectAndContinueWithState - (goCollect Zero) - st - { numToAckOnNextReq = 0 - , numIdsInFlight = numIdsToRequest - } - (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds)) + pure $ + collectAndContinueWithState + (goCollect Zero) + st + { numToAckOnNextReq = 0 + , numIdsInFlight = numIdsToRequest + } + (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds)) ) goReqObjectsAndObjectIdsPipelined :: @@ -433,7 +436,7 @@ objectDiffusionInbound let numIdsToRequest = numIdsToReq st if numIdsToRequest <= 0 - then continueWithStateM (go n) st + then pure $ continueWithStateM (go n) st else pure $ SendMsgRequestObjectIdsPipelined @@ -454,8 +457,8 @@ objectDiffusionInbound terminateAfterDrain :: Nat n -> InboundStIdle n objectId object m () terminateAfterDrain = \case - Zero -> SendMsgDone (pure ()) - Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> pure $ terminateAfterDrain n + Zero -> SendMsgDone () + Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n ------------------------------------------------------------------------------- -- Utilities to deal with stateful continuations (copied from TX-submission) @@ -487,9 +490,9 @@ continueWithStateM :: NoThunks s => StatefulM s n objectId object m -> s -> - m (InboundStIdle n objectId object m ()) + InboundStIdle n objectId object m () continueWithStateM (StatefulM f) !st = - checkInvariant (show <$> unsafeNoThunks st) (f st) + checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st) {-# NOINLINE continueWithStateM #-} -- | A variant of 'continueWithState' to be more easily utilized with @@ -499,7 +502,7 @@ collectAndContinueWithState :: StatefulCollect s n objectId object m -> s -> Collect objectId object -> - m (InboundStIdle n objectId object m ()) + InboundStIdle n objectId object m () collectAndContinueWithState (StatefulCollect f) !st c = - checkInvariant (show <$> unsafeNoThunks st) (f st c) + checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st c) {-# NOINLINE collectAndContinueWithState #-} diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/State.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V1/State.hs similarity index 99% rename from ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/State.hs rename to ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V1/State.hs index 58402da64f..3aa84c3915 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/State.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V1/State.hs @@ -6,7 +6,7 @@ {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE UndecidableInstances #-} -module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State ( ObjectDiffusionInboundState (..) , initObjectDiffusionInboundState , ObjectDiffusionInboundHandle (..) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2.hs new file mode 100644 index 0000000000..c12ca20c3c --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2.hs @@ -0,0 +1,175 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2 + ( -- * ObjectDiffusion Inbound client + objectDiffusionInbound + + -- * PeerStateAPI + , withPeer + , PeerStateAPI + + -- * Supporting types + , module V2 + , PeerDecisionChannelsVar + , newPeerDecisionChannelsVar + , DecisionPolicy (..) + , defaultDecisionPolicy + ) where + +import Control.Concurrent.Class.MonadSTM (MonadSTM, atomically) +import Control.Monad.Class.MonadThrow +import Control.Tracer (Tracer, traceWith) +import Data.List.NonEmpty qualified as NonEmpty +import Data.Set qualified as Set +import Network.TypedProtocol +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types as V2 +import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM) +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + +-- TODO: Add checks and validation + +-- | A object-diffusion inbound side (client). +-- +-- The steps are as follow +-- 1. Block on next decision from the decision logic +-- 2. Handle any available reply (`goCollect`) +-- 3. Request new objects if possible (`goReqObjects`) +-- 4. Request new ids (also responsible for ack) (`goReqIds`) +-- 5. signal psaOnDecisionCompleted (as part of `goReqIds{Blocking,NonBlocking}`) +-- And loop again +-- We need to make sure we don't go again into `goIdle` until `psaOnDecisionCompleted` has been called +objectDiffusionInbound :: + forall objectId object m. + ( MonadThrow m + , MonadSTM m + ) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + ControlMessageSTM m -> + PeerStateAPI m objectId object -> + ObjectDiffusionInboundPipelined objectId object m () +objectDiffusionInbound + tracer + controlMessageSTM + PeerStateAPI + { psaReadDecision + , psaOnDecisionCompleted + , psaOnRequestIds + , psaOnRequestObjects + , psaOnReceiveIds + , psaOnReceiveObjects + } = + ObjectDiffusionInboundPipelined $ goIdle Zero + where + goIdle :: forall (n :: N). Nat n -> InboundStIdle n objectId object m () + goIdle n = WithEffect $ do + ctrlMsg <- atomically controlMessageSTM + traceWith tracer $ TraceObjectDiffusionInboundReceivedControlMessage ctrlMsg + case ctrlMsg of + -- The peer selection governor is asking us to terminate the connection. + Terminate -> + pure $ terminateAfterDrain n + -- Otherwise, we can continue the protocol normally. + _continue -> do + -- Block on next decision. + decision <- psaReadDecision + traceWith tracer (TraceObjectDiffusionInboundReceivedDecision decision) + pure $ goCollect n decision + + terminateAfterDrain :: + Nat n -> InboundStIdle n objectId object m () + terminateAfterDrain = \case + Zero -> SendMsgDone () + Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n + + goCollect :: Nat n -> PeerDecision objectId object -> InboundStIdle n objectId object m () + goCollect Zero decision = + goReqObjects Zero decision + goCollect (Succ n) decision = + CollectPipelined + (Just $ goReqObjects (Succ n) decision) + ( \case + CollectObjectIds numIdsRequested ids -> WithEffect $ do + -- TODO: Add checks and validation + psaOnReceiveIds numIdsRequested ids + pure $ goCollect n decision + CollectObjects _objectIds objects -> WithEffect $ do + -- TODO: Add checks and validation + psaOnReceiveObjects objects + pure $ goCollect n decision + ) + + goReqObjects :: + Nat n -> + PeerDecision objectId object -> + InboundStIdle n objectId object m () + goReqObjects n object@PeerDecision{pdObjectsToReqIds} = + if Set.null pdObjectsToReqIds + then + goReqIds n object + else WithEffect $ do + psaOnRequestObjects pdObjectsToReqIds + pure $ + SendMsgRequestObjectsPipelined + (Set.toList pdObjectsToReqIds) + (goReqIds (Succ n) object) + + goReqIds :: + forall (n :: N). + Nat n -> + PeerDecision objectId object -> + InboundStIdle n objectId object m () + goReqIds n pd@PeerDecision{pdCanPipelineIdsRequests} = + if pdCanPipelineIdsRequests + then goReqIdsPipelined n pd + else case n of + Zero -> goReqIdsBlocking pd + Succ{} -> error "Impossible to have pipelined requests when we have no known unacknowledged objectIds" + + goReqIdsBlocking :: + PeerDecision objectId object -> + InboundStIdle Z objectId object m () + goReqIdsBlocking PeerDecision{pdNumIdsToAck, pdNumIdsToReq} = WithEffect $ do + if pdNumIdsToReq == 0 + then do + psaOnDecisionCompleted + pure $ goIdle Zero + else do + psaOnRequestIds pdNumIdsToAck pdNumIdsToReq + psaOnDecisionCompleted + pure $ + SendMsgRequestObjectIdsBlocking + pdNumIdsToAck + pdNumIdsToReq + ( \objectIds -> WithEffect $ do + -- TODO: Add checks and validation + psaOnReceiveIds pdNumIdsToReq (NonEmpty.toList objectIds) + pure $ goIdle Zero + ) + + goReqIdsPipelined :: + forall (n :: N). + Nat n -> + PeerDecision objectId object -> + InboundStIdle n objectId object m () + goReqIdsPipelined n PeerDecision{pdNumIdsToAck, pdNumIdsToReq} = WithEffect $ do + if pdNumIdsToReq == 0 + then do + psaOnDecisionCompleted + pure $ goIdle n + else do + psaOnRequestIds pdNumIdsToAck pdNumIdsToReq + psaOnDecisionCompleted + pure $ + SendMsgRequestObjectIdsPipelined + pdNumIdsToAck + pdNumIdsToReq + (goIdle (Succ n)) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2.md b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2.md new file mode 100644 index 0000000000..29e81edf4f --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2.md @@ -0,0 +1,187 @@ +# Object Diffusion Inbound Mini-Protocol V2 + +This document describes the inner workings of the inbound side of the `ObjectDiffusion` mini-protocol. A broad description of the whole protocol can be found in section 2.5 of [this document](https://tweag.github.io/cardano-peras/peras-design.pdf). + +- [Object Diffusion Inbound Mini-Protocol V2](#object-diffusion-inbound-mini-protocol-v2) + - [General architecture](#general-architecture) + - [Inbound peer loop](#inbound-peer-loop) + - [Peer state description and lifecycle](#peer-state-description-and-lifecycle) + - [Acknowledgement behavior](#acknowledgement-behavior) + - [Download attribution process in `makeDecisions`](#download-attribution-process-in-makedecisions) + - [On peer disconnection](#on-peer-disconnection) + - [Differences with TxSubmission V2 inbound mini-protocol](#differences-with-txsubmission-v2-inbound-mini-protocol) + +## General architecture + +In `ObjectDiffusion` V2 (only the inbound side changes compared to V1), each connection to an outbound peer is no longer considered in isolation. Instead, there is a global state `DecisionGlobalState` (defined in `Types.hs`) that tracks the state of all connections together using `dgsPeerStates :: Map peerAddr (DecisionPeerState objectId object)`. Currently, this field is the only one of `DecisionGlobalState`, meaning that we use no global data other than the sum of the peer's. Further on, we denote _an instance of the inbound protocol connected to a specific outbound peer_ simply as _an inbound peer_. + +A `DecisionPeerState` holds the state of the interaction with the distant outbound peer, which is described in more details in [this section](#fields-of-decisionpeerstate-and-their-lifecycle). + +The global state is read periodically from a dedicated _decision_ thread (defined in `Registry.hs` and `Decision.hs`), and for each inbound peer computes a `PeerDecision` (defined in `Types.hs`) that indicates what the inbound peer should do next. More specifically, in the decision, the inbound peer can find: + +- `pdIdsToReq`: number of new IDs to request from the outbound peer +- `pdIdsToAck`: number of IDs that the peer should ack in its next request for IDs. Note that if `pdIdsToReq` is zero, then no request for IDs will be sent, and thus no acknowledgment will happen despite `pdIdsToAck` being non-zero (we might change the decision process in the future to get rid of this non-intuitive case). +- `pdCanPipelineIdsReq`: a flag indicating whether the peer can pipeline its requests for IDs (instead of making a blocking call). +- `pdObjectsToReqIds`: the set of IDs of the objects that the inbound peer should request from the outbound peer. + +An inbound peer (defined in `V2.hs`) has no direct access to the state, neither in write nor read fashion. It only has access to a monadic API `PeerStateAPI` defined in `Registry.hs`. This API has 2 decisions-related callbacks, and 4 state-mutating callbacks. It should follow the decision and call the state-mutating callbacks accordingly to keep the global state consistent with the actions taken. + +**Decision-related callbacks:** + +- `psaReadDecision` that allows the inbound peer to read the current `PeerDecision` made for itself by the last round of the decision logic. This will block if a new decision is not yet available for this peer. +- `psaOnDecisionCompleted` that allows the inbound peer to signal that it has executed the last decision it read, and that the decision logic should now compute a new decision for this peer + +The decision additionally has a flag `pdStatus` (not intended to be read by the peer) that is set to `DecisionBeingActedUpon` when `psaReadDecision` returns, and stays at that value until the peer calls `psaOnDecisionCompleted`, indicating that it has executed the decision (at which point the status is set to `DecisionCompleted`). This is the main way the peer interacts with the decision thread. While the flag is set to `DecisionBeingActedUpon`, the global-state decision logic will not update the decision for this peer (it is locked, or "frozen"). + +**State-mutating callbacks:** + +These are the callbacks that the inbound peer must call when it takes the corresponding actions that has been dictated by the decision it read. These callbacks will update the corresponding peer state in the global state. For reference, the fields of this state are documented in [this section](#fields-of-decisionpeerstate-and-their-lifecycle). + +- `psaOnRequestIds` (corresponding to `onRequestIds` from `State.hs`) that must be called when emitting a request for a non-zero amount of new IDs (that will also acks previously received IDs that we no longer care about). Under the hood, `onRequestIds` will increase the `dpsNumIdsInFlight` count by the requested number of IDs, and remove the acked IDs from `dpsOutstandingFifo` and `dpsObjectsAvailableIds`. +- `psaOnReceiveIds` (corresponding to `onReceiveIds` from `State.hs`) that must be called after receiving new IDs from the outbound peer. Under the hood, `onReceiveIds` will decrease the `dpsNumIdsInFlight` count by **the number of IDs that were requested in the request corresponding to this reply** (it might be more than the number of received IDs), and add the received IDs to `dpsOutstandingFifo` and `dpsObjectsAvailableIds`. +- `psaOnRequestObjects` (corresponding to `onRequestObjects` from `State.hs`) that must be called when emitting a request for a non-zero amount of new objects. Under the hood, `onRequestObjects` will remove the requested IDs from `dpsObjectsAvailableIds` and add them to `dpsObjectsInflightIds`. +- `psaOnReceiveObjects` (corresponding to `onReceiveObjects` from `State.hs`) that must be called when receiving objects from the outbound peer. Under the hood, `onReceiveObjects` will remove the received IDs from `dpsObjectsInflightIds`, and add the received objects to `dpsOwtPool`, and call the `submitObjectsToPool` subroutine that will actually insert the objects into the object pool when the lock can be acquired (at which point the objects are removed from `dpsOwtPool`) + +NOTE: Protocol error-handling (e.g. making sure the outbound peer has sent the correct information) is done by the callback themselves, so the inbound peer doesn't have to check anything before calling these state-mutating callbacks. Preconditions that should hold, but don't due to implementation errors, are tested with `assert` throughout the code. This ensures a modicum of correctness as long as the code is sufficiently tested. + +## Inbound peer loop + +The inbound peer performs a loop where each iteration starts with (blocking on) reading a new decision, and ends with signaling that the decision has been executed. It should not return to the start of the loop too early, i.e., before it has taken all the actions dictated by the decision (except for acks, that cannot be made when `pdIdsToReq == 0`), as the decision logic considers that once a decision has been read, it is effectively "frozen in" and will be performed. So in each iteration, the inbound peer should do the following steps in order: + +1. Read the current decision via `psaReadDecision` +2. Then try to read any available reply from the outbound peer if there have been pipelined requests in previous rounds. It should process the reply accordingly, i.e. check that the reply conforms to the mini-protocol rules, and call either `psaOnReceiveIds` or `psaOnReceiveObjects` as needed +3. Then request objects (if any) as per `pdObjectsToReqIds`, and call `psaOnRequestObjects` accordingly +4. Then request IDs (if any) as per `pdIdsToReq` (acking `pdIdsToAck` as a side-effect), and call `psaOnRequestIds` accordingly +5. Call `psaOnDecisionExecuted` to signal that a new decision should be made for this peer + +In the implementation, steps 2, 3, 4 are performed by the `goCollect`, `goReqIds` and `goReqObjects` functions in `V2.hs` that each call the next one in sequence as needed. + +NOTE: The decision logic doesn't assume that we will first request objects, then only (request and) acknowledge IDs. Consequently, the decision logic won't ever ask to request objects whose IDs would be acknowledged in that same round. + +## Peer state description and lifecycle + +The following diagram indicates when and by whom fields of the `DecisionPeerState` of an inbound peer are modified. + +Fields of `DecisionPeerState` are represented as rounded rectangles, while callbacks/functions are represented as diamond shapes. The entry point of the diagram is the `makeDecisions / psaReadDecision` node, that dictates the actions to be taken by the inbound peer, that are then reflected through the `onRequestIds` and `onRequestObjects` callbacks. + +Normal arrows `->` take their source from a function, and points towards a field that is modified by this function. The label on the arrow indicates, by the sign, whether something is added _or_ removed from the field, and also the nature of the value (count, ids, objects) being added or removed. + +Arrows with rounded head show an external input of data, i.e. when the inbound peer actually receives data from the outbound peer. + +```mermaid +%%{init: {"flowchart": {"htmlLabels": true}} }%% +flowchart TD + A(dpsNumIdsInFlight) + B(dpsOutstandingFifo) + C(dpsObjectsAvailableIds) + D(dpsObjectsInflightIds) + F(dpsObjectsOwtPool) + + EA{onRequestIds} + EA-->|+count| A + EA -->|"`-ids (ack)`"| B + EA -->|"`-ids (non-downloaded only, ack)`"| C + + EB{onReceiveIds} + EB -->|-count| A + EB -->|+ids| B + IN1@{ shape: lin-cyl, label: "ids" } --o EB + EB -->|+ids| C + + EC{onRequestObjects} + EC -->|"`-ids (selected for download only)`"| C + EC -->|+ids| D + + ED{onReceiveObjects} + ED -->|-ids| D + IN2@{ shape: lin-cyl, label: "objects" } --o ED + ED -->|+objects| F + + EE{makeDecisions / psaReadDecision} + EA ~~~ EE + EC ~~~ EE + EE -.-o|pdIdsToAck + pdIdsToReq + pdCanPipelineIdsReq| EA + EE -.-o|pdObjectsToReqIds| EC + + EG{Added to pool} + EG -->|-objects| F +``` + +### Fields of `DecisionPeerState` and their lifecycle + +- `dpsNumIdsInFlight`: The cumulative number of object IDs we have asked in requests that have not yet been replied to. We need to track this to ensure we don't ask the outbound peer to keep available more objects at a given time than the protocol defined limit (see `dpMaxNumObjectsOutstanding` in `Policy.hs`). This count is incremented in `onRequestIds` by the number of requested IDs, and decremented in `onReceiveIds` by **the same number of requested IDs** when the reply is received. E.g., if we request 10 IDs, then we increment the count by 10; and if later the outbound peer replies with only 7 IDs (because it had only 7 available), we still decrement the count by 10. +- `dpsOutstandingFifo`: IDs of the objects that the outbound peer has available for us, and which we have not yet acknowledged. This is kept in the order in which the outbound peer gave them to us. It is also the order in which we acknowledge them (because acknowledgment, as in TX-submission, is made by sending the length of the prefix of the FIFO that we no longer care about, instead of providing the IDs as a set). IDs are added to this FIFO in `onReceiveIds`, and removed from this FIFO in `onRequestIds` when we acknowledge (i.e. drop) a prefix of the FIFO. +- `dpsObjectsAvailableIds`: Set of IDs of the objects that can be requested to the outbound peer, and have not yet been requested or downloaded. This is a subset of `dpsOutstandingFifo`. IDs are added to this set in `onReceiveIds`. They can be removed from this set in two ways: + - when some objects are requested by their IDs in `onRequestObjects`, the corresponding IDs are removed from `dpsObjectsAvailableIds` + - for the IDs that were voluntarily not requested (e.g. because we already have obtained them through other peers), they are removed from `dpsObjectsAvailableIds` when we acknowledge a prefix of the FIFO that contains them +- `dpsObjectsInflightIds`: The IDs of objects that have been requested to the outbound peer, but have not yet been received. IDs are added to this set in `onRequestObjects` (at the moment they are removed from `dpsObjectsAvailableIds`), and removed from this set in `onReceiveObjects` (at the moment the corresponding objects are added to `dpsObjectsOwtPool`). In ObjectDiffusion, we must receive exactly the objects that we requested, so there is no way for some items in this set to stay here indefinitely +- `dpsObjectsOwtPool`: A map of IDs to objects that have been received, and are on their way to the `ObjectPool`. As we have many inbound peers in parallel, we cannot directly insert objects into the pool when we receive them; instead, we should wait to obtain the pool lock. So we store the received objects here in the meantime, and the subroutine `submitObjectsToPool` (launched by `onReceiveObjects`) will acquire the lock and insert them into the pool when possible, and thus remove them from `dpsObjectsOwtPool` at that moment. + +## Acknowledgement behavior + +The ID of an object is eligible for acknowledgement from a given inbound peer when: + +- The corresponding object has been downloaded from its direct outbound peer, and is currently in `dpsObjectsOwtPool` of **this** inbound peer +- The corresponding object is already in the pool (either obtained through other inbound peers, or previously downloaded and inserted by this inbound peer) + +So even if the validation of a received object is done at the moment the object is added to pool, there won't be any issue. Take the example of an object that is rejected by the pool (because it has invalid cryptographic signature, for example). In this case: + +- the inbound peer that submitted the object to pool might have acked it already at the moment the object is rejected by the pool, but the rejection indicates that the outbound peer which sent us the object is adversarial, and we should disconnect from it anyway. So there is no harm done by having acked the object to the adversarial outbound peer, as we won't want to re-download this object from it again (or any other object whatsoever). +- any other inbound peer that has this ID available from its outbound peer won't be able to ack it because this ID isn't in **their** `dpsObjectsOwtPool`, and is not in the pool either, so we will be able to download it from these other peers until we find a valid one. + +As in TxSubmission, acknowledgement is done by indicating to the outbound peer the length of the (longest) prefix of the oustanding FIFO that we no longer care about (i.e. for which all IDs are eligible to acknowledgment by the definition above). The field `dpsOutstandingFifo` on the inbound peer is supposed to mirror exactly the state of the FIFO of the outbound peer, bar eventual discrepancies due to in-flight information. + +## Download attribution process in `makeDecisions` + +When making decisions, we first divide the peers in two groups: + +- Those who are currently executing a decision, i.e., those for which the (previous) decision in the decision channel verifies `pdStatus == DecisionBeingActedUpon`. These are further called _frozen peers_. +- Those who are not currently executing a decision, i.e., those for which the (previous) decision in the decision channel verifies `pdStatus == DecisionUnread || pdStatus == DecisionCompleted`. The former are the ones who didn't have time to read the previous decision yet, so it makes sense to recompute a more up-to-date decision for them. The latter are the ones who have completed executing the previous decision, so it also makes sense to compute a new decision for them. These two categories of peers are further called _active peers_. + +The rest of the decision logic will only aim to compute decisions for the active peers, while frozen peers will keep their previous decision until they complete executing it. But we need a few information from frozen peers to drive the decision for active peers. + +The first step is to pre-compute which acknowledgment each active peer will make on its next request for IDs, and how many IDs they should request. This is done by the `computeAck` function in `Decision.hs`, that produces partial `PeerDecision`s (i.e. their `pdObjectsToReqIds` field is not yet specified). + +Then we decide which objects should be downloaded from which active peer in the `pickObjectsToReq` function. + +More concretely, we list from each peer which are the interesting available objects, i.e. the objects that match this two criteria: + +- They are not already in the pool +- They are available from the peer, and won't be acked by the peer on its next request for IDs according to the partial decision computed by `computeAck` at the previous step (NOTE: theoretically, this second condition is redundant with other constraints and invariants of the current implementation). + +Then we "reverse" this mapping to obtain a map of object IDs to the set of active peers that have the corresponding interesting objects available (according to the criteria above), further called _potential providers_. + +Now, we consider how many copies of each object are already in the process of being acquired. We count as "in the process of being acquired" any object that is either: + +- in `dpsObjectsInFlightIds` of any active peer +- in `dpsObjectsInFlightIds` **or** in `pdObjectsToReqIds` of any frozen peer (because we consider that a frozen peer will execute its decision to completion, even if `onRequestObjects`, that adds items to `dpsObjectsInFlightIds`, hasn't been called yet by it) +- in `dpsObjectsOwtPool` of any peer + +For each object, sequentially, we then try to select as many providers as the difference between the redundancy target (`dpTargetObjectRedundancy` in `Policy.hs`) and the number of copies of this object already in the process of being acquired. But we also make sure, when selecting providers, that we don't go beyond the limit of objects in flight for each potential provider, and that we don't go beyond the limit of total objects in flight for our node too (defined by `dpMaxNumObjectsInflightPerPeer` and `dpMaxNumObjectsInflightTotal` in `Policy.hs`). + +The result is a map from active peers to the set of object IDs that should be requested from them. This map is then merged with the partial decisions computed by `computeAck` to produce the final decisions for each active peer, that are then propagated to the peers through their decision channels (in `Registry.hs`). + +At the moment, the algorithm is eager towards securing the target number of copies for each object, at the detriment of object coverage and peer load balancing. Future improvements could be made to address this if needed. + +NOTE: the decision logic doesn't make any changes to the global state; it only reads it. All changes to the global state are made by the inbound peers through the `PeerStateAPI` callbacks. + +## On peer disconnection + +The inbound peers are registered in the global state and decision channels map through a `bracket` function in `Registry.hs`. When a peer disconnects, the corresponding entry in the decision channels map and global state are automatically removed. + +As the global state is only a map of per-peer states at the moment, this means that we don't need to take any other particular action to clean up the global state following the disconnection of a peer. + +Any error protocol-wise (e.g. receiving invalid data from the outbound peer) or receiving objects that are rejected by the pool (e.g. if they don't have valid cryptographic signatures) should throw an exception, that will automatically lead to disconnection (and thus triggering cleanup). + +Following a peer disconnection, the next round of decision-logic will readjust accordingly. For example, if some object was in the process of being downloaded from the disconnected peer, the next round of the decision logic will see that we have fewer copies in the process of being acquired than before, and thus will ask other providers to download it. + +## Differences with TxSubmission V2 inbound mini-protocol + +Although both mini-protocol inbound implementations share the same general structure (global state and global decision thread, with peer registering through a bracket function, peer interacting through an API with the global state), there are some major differences in the implementation: + +- ObjectDiffusion decision process doesn't modify the global state at all, unlike TxSubmission one. This is true for acknowledgment computation too (that is part of decision making). Instead, all modifications to the global state are made by the inbound peers through the `PeerStateAPI` callbacks. This makes the decision logic more straighforward +- ObjectDiffusion decision process doesn't pre-filter peers based on their individual `DecisionPeerState` to know whether or not we should generate a decision for them. Instead, we use the `pdStatus` field of the decision, updated through the `psaReadDecision` and `psaOnDecisionCompleted` callbacks, to know whether or not we should compute a new decision for a peer. The conditions on which we compute a new decision are also different: we compute a new decision for a peer if it is not currently executing a decision (i.e. its status is `DecisionUnread` or `DecisionCompleted`), instead of checking various fields of its `DecisionPeerState`. +- ObjectDiffusion relies on `opwHasObject` method of the `ObjectPoolWrapper` to know whether or not an object is already in the pool, instead of tracking this information in the global state with a retention delay. This simplifies the global state and implementation a lot, but depends on the implementation of the `ObjectPoolWriter` to provide a fairly cost-efficient implementation for `opwHasObject`, as it is called often. +- Similarly, ObjectDiffusion gets rid of many global maps that were slightly redundant with information already present in each peer's state, as the only time we need to use these maps are during decision-making. So for the time being, we recompute the specific parts of this global view that we need at each round of decision-making, instead of maintaining them up-to-date at all times. We might need to revisit this later for performance purposes if needed. +- ObjectDiffusion also doesn't have a concept of ranking/scoring for peers, as an invalid object must lead to immediate disconnection. So the decision logic doesn't need to consider peer quality when attributing downloads. +- In ObjectDiffusion, the global state is not modified directly outside of `State.hs` (and `Registry.hs` when registering/unregistering peers). diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Decision.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Decision.hs new file mode 100644 index 0000000000..29f96b2b2f --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Decision.hs @@ -0,0 +1,396 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision + ( PeerDecision (..) + , makeDecisions + + -- * Internal API exposed for testing + , DecisionContext (..) + , mkDecisionContext + ) where + +import Control.DeepSeq (NFData (..)) +import Data.Foldable qualified as Foldable +import Data.Hashable (Hashable (..)) +import Data.Map.Merge.Strict qualified as Map +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.Maybe (fromMaybe) +import Data.Sequence.Strict (StrictSeq) +import Data.Sequence.Strict qualified as StrictSeq +import Data.Set (Set) +import Data.Set qualified as Set +import GHC.Generics (Generic) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types +import Ouroboros.Network.Protocol.ObjectDiffusion.Type +import System.Random (StdGen, mkStdGen) +import System.Random.SplitMix (SMGen, nextInt) +import Test.QuickCheck (Arbitrary (..)) +import Test.QuickCheck.Gen (Gen (..)) +import Test.QuickCheck.Random (QCGen (..)) + +data DecisionContext peerAddr objectId object = DecisionContext + { dcRng :: StdGen + , dcHasObject :: (objectId -> Bool) + , dcDecisionPolicy :: DecisionPolicy + , dcGlobalState :: DecisionGlobalState peerAddr objectId object + , dcPrevDecisions :: Map peerAddr (PeerDecision objectId object) + } + deriving stock Generic + deriving anyclass NFData + +-- TODO: Using `sized` to control size, we could maybe provide directly an instance of Arbitrary for DecisionContext? + +mkDecisionContext :: + forall peerAddr objectId object. + ( Arbitrary peerAddr + , Arbitrary objectId + , Arbitrary object + , Ord peerAddr + , Ord objectId + , Hashable objectId + ) => + SMGen -> + Int -> + -- | If we want to provide a specific decision policy instead of relying on an arbitrary variation of the default one + Maybe DecisionPolicy -> + DecisionContext peerAddr objectId object +mkDecisionContext stdGen size mPolicy = unGen gen (QCGen stdGen') size + where + (salt, stdGen') = nextInt stdGen + gen :: Gen (DecisionContext peerAddr objectId object) + gen = do + dcRng <- mkStdGen <$> arbitrary + dcDecisionPolicy <- fromMaybe arbitrary (pure <$> mPolicy) + dcGlobalState <- arbitrary + dcPrevDecisions <- arbitrary + let dcHasObject objId = + hashWithSalt salt objId `mod` 2 == 0 + pure $ + DecisionContext + { dcRng + , dcHasObject + , dcDecisionPolicy + , dcGlobalState + , dcPrevDecisions + } + +strictSeqToSet :: Ord a => StrictSeq a -> Set a +strictSeqToSet = Set.fromList . Foldable.toList + +-- | Make download decisions. +makeDecisions :: + forall peerAddr objectId object. + ( Ord peerAddr + , Ord objectId + ) => + DecisionContext peerAddr objectId object -> + -- | New decisions + Map peerAddr (PeerDecision objectId object) +makeDecisions DecisionContext{dcRng, dcHasObject, dcDecisionPolicy, dcGlobalState, dcPrevDecisions} = + let + -- A subset of peers are currently executing a decision. We shouldn't update the decision for them + frozenPeersToDecisions = Map.filter (\PeerDecision{pdStatus} -> pdStatus == DecisionBeingActedUpon) dcPrevDecisions + + -- We do it in two steps, because computing the acknowledgment tell which objects from dpsObjectsAvailableIds sets of each peer won't actually be available anymore (as soon as we ack them), + -- so that the pickObjectsToReq function can take this into account. + (ackAndRequestIdsDecisions, peerToIdsToAck) = computeAck dcHasObject dcDecisionPolicy dcGlobalState frozenPeersToDecisions + peersToObjectsToReq = + pickObjectsToReq + dcRng + dcHasObject + dcDecisionPolicy + dcGlobalState + frozenPeersToDecisions + peerToIdsToAck + in + Map.intersectionWith + (\decision objectsToReqIds -> decision{pdObjectsToReqIds = objectsToReqIds}) + ackAndRequestIdsDecisions + peersToObjectsToReq + +-- | The ids to ack are the longest prefix of outstandingFifo of each peer that match the following criteria: +-- * either the object is owt pool for the peer who has downloaded it +-- * or the object is already in pool +computeAck :: + forall peerAddr objectId object. + ( Ord peerAddr + , Ord objectId + ) => + (objectId -> Bool) -> + DecisionPolicy -> + DecisionGlobalState peerAddr objectId object -> + -- | Frozen peers and their previous decisions + Map peerAddr (PeerDecision objectId object) -> + ( Map peerAddr (PeerDecision objectId object) + , Map peerAddr (Set objectId) + ) +computeAck poolHasObject DecisionPolicy{dpMaxNumObjectIdsReq, dpMaxNumObjectsOutstanding} DecisionGlobalState{dgsPeerStates} frozenPeersToDecisions = + let + -- We shouldn't create a new decision for peers that are currently executing a decision + filteredPeerStates = Map.withoutKeys dgsPeerStates (Map.keysSet frozenPeersToDecisions) + (decisions, peerToIdsToAck) = + Map.foldlWithKey' computeAckForPeer (Map.empty, Map.empty) filteredPeerStates + in + ( decisions + , peerToIdsToAck + ) + where + computeAckForPeer :: + -- \| Accumulator containing decisions already made for other peers + -- It's a map in which we need to insert the new decision into + (Map peerAddr (PeerDecision objectId object), Map peerAddr (Set objectId)) -> + peerAddr -> + DecisionPeerState objectId object -> + (Map peerAddr (PeerDecision objectId object), Map peerAddr (Set objectId)) + computeAckForPeer (decisionsAcc, peerToIdsToAck) peerAddr DecisionPeerState{dpsOutstandingFifo, dpsObjectsOwtPool, dpsNumIdsInflight} = + let + -- we isolate the longest prefix of outstandingFifo that matches our ack criteria (see above in computeAck doc) + (idsToAck, dpsOutstandingFifo') = + StrictSeq.spanl + (\objectId -> poolHasObject objectId || objectId `Map.member` dpsObjectsOwtPool) + dpsOutstandingFifo + + pdNumIdsToAck = fromIntegral $ StrictSeq.length idsToAck + + futureFifoSizeOnOutboundPeer :: NumObjectIdsReq = + -- the new known fifo state after we ack the idsToAck + (fromIntegral $ StrictSeq.length dpsOutstandingFifo') + -- plus the number of ids that we have already requested but we didn't receive yet + -- that the outbound peer might consequently already have added to its fifo + + dpsNumIdsInflight + + pdNumIdsToReq = + (fromIntegral dpMaxNumObjectsOutstanding - futureFifoSizeOnOutboundPeer) + `min` dpMaxNumObjectIdsReq + + -- TODO: in the case where pdNumIdsToReq == 0, we know we actually won't be able to ack anything + -- during this round. So it might make sense to set pdNumIdsToAck = 0 and idsToAck = mempty as well? + -- If we do this change, we could add an assert in `V2.hs` that whenever pdNumIdsToReq == 0, then pdNumIdsToAck == 0 as well + -- /!\ We should also revise documentation in V2.md accordingly + + pdCanPipelineIdsRequests = not . StrictSeq.null $ dpsOutstandingFifo' + + peerDecision = + PeerDecision + { pdNumIdsToAck + , pdNumIdsToReq + , pdCanPipelineIdsRequests + , pdObjectsToReqIds = Set.empty -- we don't decide this here + , pdStatus = DecisionUnread + } + in + ( Map.insert peerAddr peerDecision decisionsAcc + , Map.insert peerAddr (strictSeqToSet idsToAck) peerToIdsToAck + ) + +orderPeers :: + StdGen -> + Map peerAddr (DecisionPeerState objectId object) -> + [(peerAddr, DecisionPeerState objectId object)] +orderPeers _rng = undefined -- TODO + +data DownloadPickState peerAddr objectId + = DownloadPickState + { totalNumObjectsToReq :: !NumObjectsReq + , objectMultiplicity :: ObjectMultiplicity + , peersToObjectsToReq :: Map peerAddr (Set objectId) + } + +-- | This function could just be pure if it hadn't be for the rng used to order peers +pickObjectsToReq :: + forall peerAddr objectId object. + ( Ord peerAddr + , Ord objectId + ) => + StdGen -> + (objectId -> Bool) -> + DecisionPolicy -> + DecisionGlobalState peerAddr objectId object -> + -- | Frozen peers and their previous decisions + Map peerAddr (PeerDecision objectId object) -> + -- | map from peer to the set of ids that will be acked for that peer on next requestIds + -- we should treat these ids as not available anymore for the purpose of picking objects to request + Map peerAddr (Set objectId) -> + -- | new global state (with just RNG updated), and objects to request from each peer + Map peerAddr (Set objectId) +pickObjectsToReq + rng + poolHasObject + DecisionPolicy + { dpMaxNumObjectsInflightPerPeer + , dpMaxNumObjectsInflightTotal + , dpTargetObjectRedundancy + } + DecisionGlobalState + { dgsPeerStates + } + frozenPeersToDecisions + peerToIdsToAck = + peersToObjectsToReq + where + -- We order the peers that are not currently executing a decision + orderedPeers = orderPeers rng (dgsPeerStates `Map.withoutKeys` Map.keysSet frozenPeersToDecisions) + + -- We want to map each objectId to the sorted list of peers that can provide it + -- For each peer we also indicate how many objects it has in flight at the moment + -- We filter out here the objects that are already in pool + objectsToSortedProviders :: Map objectId [(peerAddr, NumObjectsReq)] + objectsToSortedProviders = + -- We iterate over each peer and the corresponding available ids + -- and turn the map "inside-out" + Foldable.foldl' + ( \accMap (peerAddr, DecisionPeerState{dpsObjectsAvailableIds, dpsObjectsInflightIds}) -> + let + -- ids that will be acked for this peer won't be available anymore, so we should not consider them in the decision logic + -- + -- TODO: this is quite redundant, because ack can only be made when the object is already in the pool (in which case it would have been filtered out anyway in next step) or when the object is in dpsObjectsOwtPool of this peer (in which case it shouldn't be anymore in dpsObjectsAvailableIds) + idsToAckForThisPeer = + Map.findWithDefault + (error "invariant violated: peer must be in peerToIdsToAck map") + peerAddr + peerToIdsToAck + -- we should also remove objects that are already in the pool + interestingAndAvailableObjectIds = + Set.filter (not . poolHasObject) $ + dpsObjectsAvailableIds `Set.difference` idsToAckForThisPeer + in + -- we iterate over interestingAndAvailableObjectIds and add the peer to the list of providers for each object it can provide + Foldable.foldl' + ( \accMap' objectId -> Map.insertWith (++) objectId [(peerAddr, fromIntegral $ Set.size dpsObjectsInflightIds)] accMap' + ) + accMap + interestingAndAvailableObjectIds + ) + Map.empty + orderedPeers + + frozenPeerStatesWithDecisions = Map.intersectionWith (,) dgsPeerStates frozenPeersToDecisions + + availablePeerStates = Map.withoutKeys dgsPeerStates (Map.keysSet frozenPeersToDecisions) + + -- For frozen peers, we should consider that the objects in pdObjectsToReqIds will be requested soon, so we should consider them as inflight for the purpose of picking objects to request for other peers + objectsInFlightMultiplicitiesOfFrozenPeer = + Map.foldl' + ( \accMap (DecisionPeerState{dpsObjectsInflightIds}, PeerDecision{pdObjectsToReqIds}) -> + Foldable.foldl' + (\accMap' objectId -> Map.insertWith (+) objectId 1 accMap') + accMap + (Set.union dpsObjectsInflightIds pdObjectsToReqIds) + ) + Map.empty + frozenPeerStatesWithDecisions + -- Finally, we add to the previous map the objects that are currently inflight from peers for which we will make a decision in this round + objectsInFlightMultiplicities = + Map.foldl' + ( \accMap (DecisionPeerState{dpsObjectsInflightIds}) -> + Foldable.foldl' + (\accMap' objectId -> Map.insertWith (+) objectId 1 accMap') + accMap + dpsObjectsInflightIds + ) + objectsInFlightMultiplicitiesOfFrozenPeer + availablePeerStates + + totalNumObjectsInflight :: NumObjectsReq + totalNumObjectsInflight = fromIntegral $ Map.foldl' (+) 0 objectsInFlightMultiplicities + + objectsOwtPoolMultiplicities = + Map.foldl' + ( \accMap (DecisionPeerState{dpsObjectsOwtPool}) -> + Foldable.foldl' + (\accMap' objectId -> Map.insertWith (+) objectId 1 accMap') + accMap + (Map.keys dpsObjectsOwtPool) + ) + Map.empty + dgsPeerStates + + -- We also want to know for each objects how many peers have it in the inflight or owtPool, + -- meaning that we should receive them soon. + -- We should also add here the objects that are in the pdObjectsToReqIds of each peer decision for frozen peers, + -- if these ids are not already in dpsObjectsInflight or dpsObjectsOwtPool of this peer + objectsExpectedSoonMultiplicities :: Map objectId ObjectMultiplicity + objectsExpectedSoonMultiplicities = Map.unionWith (+) objectsInFlightMultiplicities objectsOwtPoolMultiplicities + + -- Now we join objectsToSortedProviders and objectsExpectedSoonMultiplicities maps on objectId for easy fold + objectsToProvidersAndExpectedMultiplicities :: + Map objectId ([(peerAddr, NumObjectsReq)], ObjectMultiplicity) + objectsToProvidersAndExpectedMultiplicities = + Map.merge + -- if an objectId is missing from objectsExpectedSoonMultiplicities, then its expected multiplicity is 0 + (Map.mapMissing \_ providers -> (providers, 0)) + -- if an objectId is missing from objectsToSortedProviders, then we don't care about it + Map.dropMissing + -- Combine in a tuple the list of providers and the expected multiplicity + (Map.zipWithMatched \_ providers expectedMultiplicity -> (providers, expectedMultiplicity)) + objectsToSortedProviders + objectsExpectedSoonMultiplicities + + -- NOW HERE TAKE PLACE THE ACTUAL DECISION LOGIC AND ATTRIBUTION OF OBJECTS TO PEERS + + -- The current decision logic is greedy on objects, so it will try to request as many copies of the same object as possible, + -- meaning we will have optimal coverage of the first objects, but might not request some other objects at all if they are (only) provided by peers that are already saturated. + + -- Now we compute the actual attribution of downloads for peers + DownloadPickState{peersToObjectsToReq} = + -- We iterate over each objectId and the corresponding (providers, expectedMultiplicity) + Map.foldlWithKey' + ( \st objectId (providers, expectedMultiplicity) -> + -- reset the objectMultiplicity counter for each new objectId + let st' = st{objectMultiplicity = 0} + in -- We iterate over the list of providers, and pick them or not according to the current state + -- When a peer is selected as a provider for this objectId, we insert the objectId in the peer's set in peersToObjectsToReq (inside St) + -- So the result of the filtering of providers is part of the final St state + Foldable.foldl' + (howToFoldProviders objectId expectedMultiplicity) + st' + providers + ) + DownloadPickState + { totalNumObjectsToReq = 0 + , objectMultiplicity = 0 + , peersToObjectsToReq = Map.empty + } + objectsToProvidersAndExpectedMultiplicities + + -- This function decides whether or not we should select a given peer as provider for the current objectId + -- it takes into account if we are expecting to obtain the object from other sources (either inflight/owt pool already, or if the object will be requested from already selected peers in this given round) + howToFoldProviders :: + objectId -> + ObjectMultiplicity -> + DownloadPickState peerAddr objectId -> + (peerAddr, NumObjectsReq) -> + DownloadPickState peerAddr objectId + howToFoldProviders objectId expectedMultiplicity st@DownloadPickState{totalNumObjectsToReq, objectMultiplicity, peersToObjectsToReq} (peerAddr, numObjectsInFlight) = + let + -- see what has already been attributed to this peer + objectsToReq = Map.findWithDefault Set.empty peerAddr peersToObjectsToReq + + shouldSelect = + -- We should not go over the multiplicity limit per object + objectMultiplicity + expectedMultiplicity < dpTargetObjectRedundancy + -- We should not go over the total number of objects inflight limit + && totalNumObjectsInflight + totalNumObjectsToReq < dpMaxNumObjectsInflightTotal + -- We should not go over the per-peer number of objects inflight limit + && numObjectsInFlight + (fromIntegral $ Set.size objectsToReq) < dpMaxNumObjectsInflightPerPeer + in + if shouldSelect + then + -- We increase both global count and per-object count, and we add the object to the peer's set + DownloadPickState + { totalNumObjectsToReq = totalNumObjectsToReq + 1 + , objectMultiplicity = objectMultiplicity + 1 + , peersToObjectsToReq = Map.insert peerAddr (Set.insert objectId objectsToReq) peersToObjectsToReq + } + -- Or we keep the state as is if we don't select this peer + else st diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Policy.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Policy.hs new file mode 100644 index 0000000000..77d1ecd85a --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Policy.hs @@ -0,0 +1,92 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy + ( DecisionPolicy (..) + , defaultDecisionPolicy + ) where + +import Control.DeepSeq (NFData) +import GHC.Generics (Generic) +import NoThunks.Class +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types (ObjectMultiplicity (..)) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + ( NumObjectIdsReq (..) + , NumObjectsOutstanding (..) + , NumObjectsReq (..) + ) +import Test.QuickCheck (Arbitrary (..), Gen, choose) + +-- | Policy for making decisions +data DecisionPolicy = DecisionPolicy + { dpMaxNumObjectIdsReq :: !NumObjectIdsReq + -- ^ a maximal number of objectIds requested at once. + , dpMaxNumObjectsOutstanding :: !NumObjectsOutstanding + -- ^ maximal number of objects in the outstanding FIFO. + , dpMaxNumObjectsInflightPerPeer :: !NumObjectsReq + -- ^ a limit of objects in-flight from a single peer. + , dpMaxNumObjectsInflightTotal :: !NumObjectsReq + -- ^ a limit of objects in-flight from all peers for this node. + , dpTargetObjectRedundancy :: !ObjectMultiplicity + -- ^ from how many peers download the `objectId` simultaneously + } + deriving stock (Show, Eq, Generic) + deriving anyclass (NFData, NoThunks) + +instance Arbitrary DecisionPolicy where + arbitrary = + let DecisionPolicy + { dpMaxNumObjectIdsReq + , dpMaxNumObjectsOutstanding + , dpMaxNumObjectsInflightPerPeer + , dpMaxNumObjectsInflightTotal + , dpTargetObjectRedundancy + } = defaultDecisionPolicy + in DecisionPolicy + <$> (chooseGeometricWithMedian dpMaxNumObjectIdsReq) + <*> (chooseGeometricWithMedian dpMaxNumObjectsOutstanding) + <*> (chooseGeometricWithMedian dpMaxNumObjectsInflightPerPeer) + <*> (chooseGeometricWithMedian dpMaxNumObjectsInflightTotal) + <*> (chooseGeometricWithMedian dpTargetObjectRedundancy) + +defaultDecisionPolicy :: DecisionPolicy +defaultDecisionPolicy = + DecisionPolicy + { dpMaxNumObjectIdsReq = 3 + , dpMaxNumObjectsOutstanding = 10 -- must be the same as the outbound peer's value + , dpMaxNumObjectsInflightPerPeer = 6 + , dpMaxNumObjectsInflightTotal = 20 + , dpTargetObjectRedundancy = 2 + } + +-- TODO: this needs to be tested and inspected + +-- | Geometric-decay generator over [1 .. maxBound - 1] for the type 'a'. +-- Smaller values are more likely; the (lower) median is ~ medianTarget. +-- Works for any Integral + Bounded numeric type (e.g., Int, Word32, Int64). +chooseGeometricWithMedian :: forall a. (Integral a, Bounded a) => a -> Gen a +chooseGeometricWithMedian medianTarget + | (maxBound @a) <= 1 = + error "Type's maxBound <= 1: no room for [1..maxBound-1]" + | medianTarget < 1 || medianTarget >= maxBound = + error "medianTarget must be in [1 .. maxBound-1]" + | otherwise = do + let lo = 1 + hi = maxBound - 1 + -- use Integer for counts, Double for CDF inversion + nI = toInteger (hi - lo + 1) + mI = toInteger (medianTarget - lo + 1) + n = fromIntegral nI :: Double + m = fromIntegral mI :: Double + p = 1 - 2 ** (-1 / m) -- set so P(X ≤ median) ≈ 0.5 + q = 1 - p -- decay factor + qn = q ** n -- truncation term + u <- choose (0, 1 :: Double) + let y = 1 - u * (1 - qn) + k = floor (log y / log q) -- inverse truncated geometric CDF + k' = max 0 (min (floor (n - 1)) k) + pure (lo + fromInteger k') diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Registry.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Registry.hs new file mode 100644 index 0000000000..3f9c73106e --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Registry.hs @@ -0,0 +1,334 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry + ( PeerDecisionChannels + , PeerDecisionChannelsVar + , ObjectPoolSem + , DecisionGlobalStateVar + , newPeerDecisionChannelsVar + , newObjectPoolSem + , PeerStateAPI (..) + , withPeer + , decisionLogicThread + ) where + +import Control.Concurrent.Class.MonadSTM.Strict +import Control.Monad (forever, when) +import Control.Monad.Class.MonadFork +import Control.Monad.Class.MonadThrow +import Control.Monad.Class.MonadTime.SI +import Control.Monad.Class.MonadTimer.SI +import Control.Monad.IO.Class (MonadIO) +import Control.Tracer (Tracer, traceWith) +import Data.Foldable as Foldable (traverse_) +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.Sequence.Strict qualified as StrictSeq +import Data.Set (Set) +import Data.Set qualified as Set +import Data.Void (Void) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Policy +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State qualified as State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Network.Protocol.ObjectDiffusion.Type (NumObjectIdsAck, NumObjectIdsReq) +import System.Random (initStdGen) + +-- | Communication channels between `ObjectDiffusion` mini-protocol inbound side +-- and decision logic. +type PeerDecisionChannels m peerAddr objectId object = + Map peerAddr (StrictTVar m (PeerDecision objectId object)) + +type PeerDecisionChannelsVar m peerAddr objectId object = + StrictTVar m (PeerDecisionChannels m peerAddr objectId object) + +newPeerDecisionChannelsVar :: + MonadSTM m => m (PeerDecisionChannelsVar m peerAddr objectId object) +newPeerDecisionChannelsVar = newTVarIO (Map.empty) + +data PeerStateAPI m objectId object = PeerStateAPI + { psaReadDecision :: m (PeerDecision objectId object) + -- ^ A blocking action which reads the `PeerDecision` for this peer from the decision channel. + -- It blocks until a new decision (i.e. with status `DecisionUnread`) is emitted for the peer by the deecision thread, + -- and immediately turn its status to `DecisionBeingActedUpon`. + -- + -- PRECONDITIONS: + -- * The decision in the channel has status `DecisionUnread` or `DecisionCompleted` + -- POSTCONDITIONS: + -- * The decision in the channel has status `DecisionBeingActedUpon` + , psaOnDecisionCompleted :: m () + -- ^ To be called by the peer when it has fully executed the decision. + -- Marks the peer as available for the decision logic. + -- + -- PRECONDITIONS: + -- * The decision in the channel has status `DecisionBeingActedUpon` + -- POSTCONDITIONS: + -- * The decision in the channel has status `DecisionCompleted` + , psaOnRequestIds :: NumObjectIdsAck -> NumObjectIdsReq -> m () + -- ^ To be called when emitting a request for new IDs (that also acks previously received IDs that we no longer care about). + -- Under the hood, it will increase the `dpsNumIdsInFlight` count by the requested number of IDs, and remove the acked IDs + -- from `dpsOutstandingFifo` and `dpsObjectsAvailableIds`. Note that those IDs may not be present in the latter, if they have + -- already been requested to the outbound peer. + -- + -- PRECONDITIONS: + -- * `dpsOutstandingFifo` has at least `nAck :: NumObjectIdsAck` IDs that will be removed from it + -- POSTCONDITIONS: + -- * The `nAck` first IDs from `dpsOutstandingFifo` are removed from `dpsOutstandingFifo` and removed from `dpsObjectsAvailableIds` + , psaOnRequestObjects :: Set objectId -> m () + -- ^ To be called when emitting a request for new objects. Under the hood, it will remove the requested IDs from `dpsObjectsAvailableIds` + -- and add them to `dpsObjectsInflightIds`. + -- + -- PRECONDITIONS: + -- * The requested IDs are a subset of `dpsObjectsAvailableIds` + -- * The requested IDs are not in `dpsObjectsInflightIds` + -- POSTCONDITIONS: + -- * The requested IDs are removed from `dpsObjectsAvailableIds` + -- * The requested IDs are now in `dpsObjectsInflightIds` + , psaOnReceiveIds :: NumObjectIdsReq -> [objectId] -> m () + -- ^ To be called after receiving new IDs from the outbound peer, after validating that we received the correct number (not more than requested). + -- Under the hood, it will decrease the `dpsNumIdsInFlight` count by **the number of IDs that were requested in the request corresponding to this reply**. + -- This number might be more than the number of received IDs. It also add the received IDs to `dpsOutstandingFifo` and `dpsObjectsAvailableIds`. + -- + -- PRECONDITIONS: + -- * The number of received IDs is less than or equal to `nReq :: NumObjectIdsReq` (the number of IDs that were requested in the request corresponding to this reply) + -- * The received IDs are not already in `dpsObjectsAvailableIds` nor in `dpsObjectsInflightIds` nor in `dpsObjectsOwtPool` + -- * The received IDs do not contain duplicates + -- * `dpsNumIdsInFlight` is greater than or equal to `nReq :: NumObjectIdsReq` + -- POSTCONDITIONS: + -- * `dpsNumIdsInflight` is `nReq` less than before + -- * `dpsOutstandingFifo` contains the received IDs appended at the end in the same order as they were received + -- * `dpsObjectsAvailableIds` contains the received IDs + , psaOnReceiveObjects :: [object] -> m () + -- ^ To be called when receiving objects from the outbound peer, after validating that the received objects match exactly the requested IDs. + -- It also checks that all received objects have valid cryptographic proofs. + -- Under the hood, it will remove the received IDs from `dpsObjectsInflightIds`, add the received objects to `dpsOwtPool`, + -- and call the `submitObjectsToPool` subroutine that will actually insert the objects into the object pool. + -- + -- PRECONDITIONS: + -- * All received objects are valid wrt. their cryptographic proofs/invariants specific to the object type + -- * The received objects correspond exactly to the set of requested objects (order not mattering) + -- * The IDs of the received objects are a subset of `dpsObjectsInflightIds` + -- POSTCONDITIONS: + -- * The IDs of the received objects are removed from `dpsObjectsInflightIds` + -- * `dpsObjectsOwtPool` contains the received objects + } + +-- | A bracket function which registers / de-registers a new peer in +-- `DecisionGlobalStateVar` and `PeerDecisionChannelsVar`s, which exposes `PeerStateAPI`. +-- `PeerStateAPI` is only safe inside the `withPeer` scope. +withPeer :: + forall object peerAddr objectId m a. + ( MonadMask m + , MonadSTM m + , Ord objectId + , Ord peerAddr + ) => + Tracer m (TraceDecisionLogic peerAddr objectId object) -> + Tracer m (TraceObjectDiffusionInbound objectId object) -> + PeerDecisionChannelsVar m peerAddr objectId object -> + DecisionPolicy -> + DecisionGlobalStateVar m peerAddr objectId object -> + ObjectPoolWriter objectId object m -> + ObjectPoolSem m -> + -- | new peer + peerAddr -> + -- | callback which gives access to `PeerStateAPI` + (PeerStateAPI m objectId object -> m a) -> + m a +withPeer + decisionTracer + objectDiffusionTracer + decisionChannelsVar + _decisionPolicy + globalStateVar + objectPoolWriter + objectPoolSem + peerAddr + withAPI = + bracket registerPeerAndCreateAPI unregisterPeer withAPI + where + registerPeerAndCreateAPI :: m (PeerStateAPI m objectId object) + registerPeerAndCreateAPI = atomically $ do + peerToChannel <- readTVar decisionChannelsVar + decisionChan <- case peerToChannel Map.!? peerAddr of + -- Checks if a channel already exists for this peer, in case we reuse it + -- Should not happen normally, because we unregister the peer from the channels map on disconnection through the bracket function + Just chan -> return chan + -- Otherwise create a new channel and register it + Nothing -> do + newChan <- newTVar unavailableDecision + modifyTVar decisionChannelsVar (Map.insert peerAddr newChan) + return newChan + + let !inboundPeerAPI = + PeerStateAPI + { psaReadDecision = atomically $ do + -- This should block until the decision has status `DecisionUnread` + -- which means it is a new decision that the peer has not acted upon yet + -- If `DecisionCompleted` is read here, it means the decision logic hasn't had time to make a new decision for this peer + decision@PeerDecision{pdStatus} <- readTVar decisionChan + when (pdStatus == DecisionBeingActedUpon) $ + error "Forgot to call `psaOnDecisionCompleted` for this peer" + check $ pdStatus == DecisionUnread + let decision' = decision{pdStatus = DecisionBeingActedUpon} + writeTVar decisionChan decision' + return decision' + , psaOnDecisionCompleted = atomically $ do + decision@PeerDecision{pdStatus} <- readTVar decisionChan + when (pdStatus == DecisionUnread) $ + error + "Forgot to call `psaReadDecision` for this peer, or the decision thread has mistakenly updated the decision for this peer while it was executing it" + when (pdStatus == DecisionCompleted) $ + error "`psaOnDecisionCompleted` has already been called for this peer" + let decision' = decision{pdStatus = DecisionCompleted} + writeTVar decisionChan decision' + , psaOnRequestIds = + State.onRequestIds + objectDiffusionTracer + decisionTracer + globalStateVar + peerAddr + , psaOnRequestObjects = + State.onRequestObjects + objectDiffusionTracer + decisionTracer + globalStateVar + peerAddr + , psaOnReceiveIds = + State.onReceiveIds + objectDiffusionTracer + decisionTracer + objectPoolWriter + globalStateVar + peerAddr + , psaOnReceiveObjects = \objects -> do + PeerDecision{pdObjectsToReqIds} <- atomically $ readTVar decisionChan + State.onReceiveObjects + objectDiffusionTracer + decisionTracer + globalStateVar + objectPoolWriter + objectPoolSem + peerAddr + pdObjectsToReqIds + objects + } + + -- register the peer in the global state now + modifyTVar globalStateVar registerPeerGlobalState + -- initialization is complete for this peer, it can proceed and + -- interact through its given API + return inboundPeerAPI + where + + unregisterPeer :: PeerStateAPI m objectId object -> m () + unregisterPeer _api = + -- the handler is a short blocking operation, thus we need to use + -- `uninterruptibleMask_` + uninterruptibleMask_ $ atomically $ do + -- unregister the peer from the global state + modifyTVar globalStateVar unregisterPeerGlobalState + -- remove the channel of this peer from the global channel map + modifyTVar decisionChannelsVar (Map.delete peerAddr) + + registerPeerGlobalState :: + DecisionGlobalState peerAddr objectId object -> + DecisionGlobalState peerAddr objectId object + registerPeerGlobalState st@DecisionGlobalState{dgsPeerStates} = + st + { dgsPeerStates = + Map.insert + peerAddr + DecisionPeerState + { dpsObjectsAvailableIds = Set.empty + , dpsNumIdsInflight = 0 + , dpsObjectsInflightIds = Set.empty + , dpsOutstandingFifo = StrictSeq.empty + , dpsObjectsOwtPool = Map.empty + } + dgsPeerStates + } + + -- TODO: this function needs to be tested! + -- Issue: https://github.com/IntersectMBO/ouroboros-network/issues/5151 + unregisterPeerGlobalState :: + DecisionGlobalState peerAddr objectId object -> + DecisionGlobalState peerAddr objectId object + unregisterPeerGlobalState + st@DecisionGlobalState + { dgsPeerStates + } = + st + { dgsPeerStates = Map.delete peerAddr dgsPeerStates + } + +decisionLogicThread :: + forall m peerAddr objectId object. + ( MonadDelay m + , MonadSTM m + , MonadFork m + , MonadIO m + , Ord peerAddr + , Ord objectId + ) => + Tracer m (TraceDecisionLogic peerAddr objectId object) -> + Tracer m ObjectDiffusionCounters -> + ObjectPoolWriter objectId object m -> + DecisionPolicy -> + PeerDecisionChannelsVar m peerAddr objectId object -> + DecisionGlobalStateVar m peerAddr objectId object -> + m Void +decisionLogicThread decisionTracer countersTracer ObjectPoolWriter{opwHasObject} decisionPolicy decisionChannelsVar globalStateVar = do + labelThisThread "ObjectDiffusionInbound.decisionLogicThread" + forever $ do + -- We rate limit the decision making process, it could overwhelm the CPU + -- if there are too many inbound connections. + threadDelay const_DECISION_LOOP_DELAY + + rng <- initStdGen + + -- TODO: can we make this whole block atomic? + -- because makeDecisions should be atomic with respect to reading the global state and + -- reading the previous decisions + (newDecisions, counters) <- atomically $ do + decisionsChannels <- readTVar decisionChannelsVar + prevDecisions <- traverse readTVar decisionsChannels + globalState <- readTVar globalStateVar + hasObject <- opwHasObject + let newDecisions = + makeDecisions + DecisionContext + { dcRng = rng + , dcHasObject = hasObject + , dcDecisionPolicy = decisionPolicy + , dcGlobalState = globalState + , dcPrevDecisions = prevDecisions + } + + peerToChannel <- readTVar decisionChannelsVar + -- Pair decision channel with the corresponding decision + let peerToChannelAndDecision = + Map.intersectionWith + (,) + peerToChannel + newDecisions + -- Send the newDecisions to the corresponding peers + traverse_ + (\(chan, decision) -> writeTVar chan decision) + peerToChannelAndDecision + + -- Return values for tracing purposes + let counters = makeObjectDiffusionCounters globalState + return (newDecisions, counters) + + traceWith decisionTracer (TraceDecisionLogicDecisionsMade newDecisions) + traceWith countersTracer counters + +-- `5ms` delay +const_DECISION_LOOP_DELAY :: DiffTime +const_DECISION_LOOP_DELAY = 0.005 diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/State.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/State.hs new file mode 100644 index 0000000000..a3d237be48 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/State.hs @@ -0,0 +1,452 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State + ( -- * Core API + DecisionGlobalState (..) + , DecisionPeerState (..) + , onRequestIds + , onRequestObjects + , onReceiveIds + , onReceiveObjects + ) where + +import Control.Concurrent.Class.MonadSTM.Strict +import Control.Concurrent.Class.MonadSTM.TSem +import Control.Exception (assert, throw) +import Control.Monad (when) +import Control.Tracer (Tracer, traceWith) +import Data.Foldable qualified as Foldable +import Data.Map.Strict (Map, findWithDefault) +import Data.Map.Strict qualified as Map +import Data.Sequence.Strict qualified as StrictSeq +import Data.Set (Set, (\\)) +import Data.Set qualified as Set +import GHC.Stack (HasCallStack) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API (ObjectPoolWriter (..)) +import Ouroboros.Consensus.Util.IOLike (MonadMask, bracket_) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type (NumObjectIdsAck, NumObjectIdsReq) + +onRequestIds :: + forall m peerAddr object objectId. + (MonadSTM m, Ord objectId, Ord peerAddr) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + Tracer m (TraceDecisionLogic peerAddr objectId object) -> + DecisionGlobalStateVar m peerAddr objectId object -> + peerAddr -> + NumObjectIdsAck -> + -- | number of requests to req + NumObjectIdsReq -> + m () +onRequestIds + odTracer + decisionTracer + globalStateVar + peerAddr + numIdsToAck + numIdsToReq = do + globalState' <- atomically $ do + stateTVar + globalStateVar + ( \globalState -> + let globalState' = onRequestIdsImpl peerAddr numIdsToAck numIdsToReq globalState + in (globalState', globalState') + ) + traceWith odTracer (TraceObjectDiffusionInboundRequestedIds (fromIntegral numIdsToReq)) + traceWith decisionTracer (TraceDecisionLogicGlobalStateUpdated "onRequestIds" globalState') + +-- Acknowledgment is done when a requestIds is made. +-- That's why we update the dpsOutstandingFifo and dpsObjectsAvailableIds here. +onRequestIdsImpl :: + forall peerAddr object objectId. + (Ord objectId, Ord peerAddr) => + peerAddr -> + NumObjectIdsAck -> + -- | number of requests to req + NumObjectIdsReq -> + DecisionGlobalState peerAddr objectId object -> + DecisionGlobalState peerAddr objectId object +onRequestIdsImpl + peerAddr + numIdsToAck + numIdsToReq + globalState@DecisionGlobalState + { dgsPeerStates + } = + globalState + { dgsPeerStates = dgsPeerStates' + } + where + dgsPeerStates' = + Map.adjust + ( \ps@DecisionPeerState{dpsNumIdsInflight, dpsOutstandingFifo, dpsObjectsAvailableIds} -> + -- we isolate the longest prefix of outstandingFifo that matches our ack criteria (see above in computeAck doc) + let + -- We compute the ids to ack and new state of the FIFO based on the number of ids to ack given by the decision logic + (idsToAck, dpsOutstandingFifo') = + assert (StrictSeq.length dpsOutstandingFifo >= fromIntegral numIdsToAck) $ + StrictSeq.splitAt + (fromIntegral numIdsToAck) + dpsOutstandingFifo + + -- We remove the acknowledged ids from dpsObjectsAvailableIds if they were present. + -- We need to do that because objects that were advertised by this corresponding outbound peer + -- but never downloaded because we already have them in pool were consequently never removed + -- from dpsObjectsAvailableIds by onRequestObjects + dpsObjectsAvailableIds' = + Foldable.foldl' (\set objectId -> Set.delete objectId set) dpsObjectsAvailableIds idsToAck + in + ps + { dpsNumIdsInflight = dpsNumIdsInflight + numIdsToReq + , dpsOutstandingFifo = dpsOutstandingFifo' + , dpsObjectsAvailableIds = dpsObjectsAvailableIds' + } + ) + peerAddr + dgsPeerStates + +onRequestObjects :: + forall m peerAddr object objectId. + (MonadSTM m, Ord objectId, Ord peerAddr) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + Tracer m (TraceDecisionLogic peerAddr objectId object) -> + DecisionGlobalStateVar m peerAddr objectId object -> + peerAddr -> + -- | objets to request, by id + Set objectId -> + m () +onRequestObjects odTracer decisionTracer globalStateVar peerAddr objectIds = do + globalState' <- atomically $ do + stateTVar + globalStateVar + ( \globalState -> + let globalState' = onRequestObjectsImpl peerAddr objectIds globalState + in (globalState', globalState') + ) + traceWith odTracer (TraceObjectDiffusionInboundRequestedObjects (Set.size objectIds)) + traceWith decisionTracer (TraceDecisionLogicGlobalStateUpdated "onRequestObjects" globalState') + +onRequestObjectsImpl :: + forall peerAddr object objectId. + (Ord objectId, Ord peerAddr) => + peerAddr -> + -- | objets to request, by id + Set objectId -> + DecisionGlobalState peerAddr objectId object -> + DecisionGlobalState peerAddr objectId object +onRequestObjectsImpl + peerAddr + objectIds + globalState@DecisionGlobalState + { dgsPeerStates + } = + globalState + { dgsPeerStates = dgsPeerStates' + } + where + dgsPeerStates' = + Map.adjust + ( \ps@DecisionPeerState{dpsObjectsAvailableIds, dpsObjectsInflightIds} -> + assert + ( objectIds `Set.isSubsetOf` dpsObjectsAvailableIds + && Set.null (objectIds `Set.intersection` dpsObjectsInflightIds) + ) + $ ps + { dpsObjectsAvailableIds = dpsObjectsAvailableIds \\ objectIds + , dpsObjectsInflightIds = dpsObjectsInflightIds `Set.union` objectIds + } + ) + peerAddr + dgsPeerStates + +-- | Wrapper around `onReceiveIdsImpl`. +-- Obtain the `hasObject` function atomically from the STM context and +-- updates and traces the global state TVar. +onReceiveIds :: + forall m peerAddr object objectId. + (MonadSTM m, Ord objectId, Ord peerAddr) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + Tracer m (TraceDecisionLogic peerAddr objectId object) -> + ObjectPoolWriter objectId object m -> + DecisionGlobalStateVar m peerAddr objectId object -> + peerAddr -> + -- | number of requests to subtract from + -- `dpsNumIdsInflight` + NumObjectIdsReq -> + -- | sequence of received `objectIds` + [objectId] -> + -- | received `objectId`s + m () +onReceiveIds + odTracer + decisionTracer + ObjectPoolWriter{opwHasObject} + globalStateVar + peerAddr + numIdsInitiallyRequested + receivedIds = do + peerState <- atomically $ ((Map.! peerAddr) . dgsPeerStates) <$> readTVar globalStateVar + hasObject <- atomically opwHasObject + checkProtocolErrors hasObject peerState numIdsInitiallyRequested receivedIds + globalState' <- atomically $ do + stateTVar + globalStateVar + ( \globalState -> + let globalState' = onReceiveIdsImpl peerAddr numIdsInitiallyRequested receivedIds globalState + in (globalState', globalState') + ) + traceWith odTracer (TraceObjectDiffusionInboundReceivedIds (length receivedIds)) + traceWith decisionTracer (TraceDecisionLogicGlobalStateUpdated "onReceiveIds" globalState') + where + checkProtocolErrors :: + (objectId -> Bool) -> + DecisionPeerState objectId object -> + NumObjectIdsReq -> + [objectId] -> + m () + checkProtocolErrors hasObject DecisionPeerState{dpsObjectsAvailableIds, dpsObjectsInflightIds} nReq ids = do + when (length ids > fromIntegral nReq) $ throw ProtocolErrorObjectIdsNotRequested + let idSet = Set.fromList ids + when (length ids /= Set.size idSet) $ throw ProtocolErrorObjectIdsDuplicate + when + ( (not $ Set.null $ idSet `Set.intersection` dpsObjectsAvailableIds) + || (not $ Set.null $ idSet `Set.intersection` dpsObjectsInflightIds) + || (any hasObject ids) + ) + $ throw ProtocolErrorObjectIdAlreadyKnown + +onReceiveIdsImpl :: + forall peerAddr object objectId. + (Ord objectId, Ord peerAddr, HasCallStack) => + peerAddr -> + -- | number of requests to subtract from + -- `dpsNumIdsInflight` + NumObjectIdsReq -> + -- | sequence of received `objectId`s + [objectId] -> + DecisionGlobalState peerAddr objectId object -> + DecisionGlobalState peerAddr objectId object +onReceiveIdsImpl + peerAddr + numIdsInitiallyRequested + receivedIds + globalState@DecisionGlobalState + { dgsPeerStates + } = + globalState + { dgsPeerStates = dgsPeerStates' + } + where + peerState@DecisionPeerState + { dpsOutstandingFifo + , dpsObjectsAvailableIds + , dpsNumIdsInflight + } = + findWithDefault + (error "ObjectDiffusion.onReceiveIdsImpl: the peer should appear in dgsPeerStates") + peerAddr + dgsPeerStates + + -- Actually we don't need to filter out availableIds, because + -- makeDecisions is the only reader of dpsObjectsAvailableIds + -- and will filter it when needed with the actualized state of the object + -- pool. + dpsObjectsAvailableIds' = + dpsObjectsAvailableIds `Set.union` Set.fromList receivedIds + + -- Add received objectIds to `dpsOutstandingFifo`. + dpsOutstandingFifo' = dpsOutstandingFifo <> StrictSeq.fromList receivedIds + + peerState' = + assert + (dpsNumIdsInflight >= numIdsInitiallyRequested) + peerState + { dpsObjectsAvailableIds = dpsObjectsAvailableIds' + , dpsOutstandingFifo = dpsOutstandingFifo' + , dpsNumIdsInflight = dpsNumIdsInflight - numIdsInitiallyRequested + } + + dgsPeerStates' = Map.insert peerAddr peerState' dgsPeerStates + +-- | Wrapper around `onReceiveObjectsImpl` that updates and traces the +-- global state TVar. +onReceiveObjects :: + forall m peerAddr object objectId. + ( MonadSTM m + , MonadMask m + , Ord objectId + , Ord peerAddr + ) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + Tracer m (TraceDecisionLogic peerAddr objectId object) -> + DecisionGlobalStateVar m peerAddr objectId object -> + ObjectPoolWriter objectId object m -> + ObjectPoolSem m -> + peerAddr -> + -- | requested objects + Set objectId -> + -- | received objects + [object] -> + m () +onReceiveObjects + odTracer + tracer + globalStateVar + objectPoolWriter + poolSem + peerAddr + objectsRequestedIds + objectsReceived = do + let getId = opwObjectId objectPoolWriter + let objectsReceivedMap = Map.fromList $ (\obj -> (getId obj, obj)) <$> objectsReceived + checkProtocolErrors objectsRequestedIds objectsReceivedMap + globalState' <- atomically $ do + stateTVar + globalStateVar + ( \globalState -> + let globalState' = + onReceiveObjectsImpl + peerAddr + objectsReceivedMap + globalState + in (globalState', globalState') + ) + traceWith odTracer (TraceObjectDiffusionInboundReceivedObjects (length objectsReceived)) + traceWith tracer (TraceDecisionLogicGlobalStateUpdated "onReceiveObjects" globalState') + submitObjectsToPool + odTracer + tracer + globalStateVar + objectPoolWriter + poolSem + peerAddr + objectsReceivedMap + where + checkProtocolErrors :: + Set objectId -> + Map objectId object -> + m () + checkProtocolErrors requested received' = do + let received = Map.keysSet received' + when (not $ Set.null $ requested \\ received) $ throw ProtocolErrorObjectMissing + when (not $ Set.null $ received \\ requested) $ throw ProtocolErrorObjectNotRequested + +onReceiveObjectsImpl :: + forall peerAddr object objectId. + ( Ord peerAddr + , Ord objectId + ) => + peerAddr -> + -- | received objects + Map objectId object -> + DecisionGlobalState peerAddr objectId object -> + DecisionGlobalState peerAddr objectId object +onReceiveObjectsImpl + peerAddr + objectsReceived + st@DecisionGlobalState + { dgsPeerStates + } = + st + { dgsPeerStates = dgsPeerStates' + } + where + objectsReceivedIds = Map.keysSet objectsReceived + + peerState@DecisionPeerState + { dpsObjectsInflightIds + , dpsObjectsOwtPool + } = + findWithDefault + (error "ObjectDiffusion.onReceiveObjectsImpl: the peer should appear in dgsPeerStates") + peerAddr + dgsPeerStates + + -- subtract requested from in-flight + dpsObjectsInflightIds' = + assert (objectsReceivedIds `Set.isSubsetOf` dpsObjectsInflightIds) $ + dpsObjectsInflightIds \\ objectsReceivedIds + + dpsObjectsOwtPool' = dpsObjectsOwtPool <> objectsReceived + + peerState' = + peerState + { dpsObjectsInflightIds = dpsObjectsInflightIds' + , dpsObjectsOwtPool = dpsObjectsOwtPool' + } + + dgsPeerStates' = Map.insert peerAddr peerState' dgsPeerStates + +submitObjectsToPool :: + forall m peerAddr object objectId. + ( Ord objectId + , Ord peerAddr + , MonadMask m + , MonadSTM m + ) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + Tracer m (TraceDecisionLogic peerAddr objectId object) -> + DecisionGlobalStateVar m peerAddr objectId object -> + ObjectPoolWriter objectId object m -> + ObjectPoolSem m -> + peerAddr -> + Map objectId object -> + m () +submitObjectsToPool + odTracer + decisionTracer + globalStateVar + objectPoolWriter + (ObjectPoolSem poolSem) + peerAddr + objects = do + let getId = opwObjectId objectPoolWriter + + bracket_ + (atomically $ waitTSem poolSem) + (atomically $ signalTSem poolSem) + $ do + -- When the lock over the object pool is obtained + opwAddObjects objectPoolWriter (Map.elems objects) + traceWith odTracer $ + TraceObjectDiffusionInboundAddedObjects $ + length objects + + -- Move objects from `owtPool` to `inPool` state + globalState' <- atomically $ stateTVar globalStateVar $ \globalState -> + let globalState' = + Foldable.foldl' + (\st object -> updateStateWhenObjectAddedToPool (getId object) st) + globalState + objects + in (globalState', globalState') + traceWith + decisionTracer + ( TraceDecisionLogicGlobalStateUpdated + "submitObjectsToPool.updateStateWhenObjectAddedToPool" + globalState' + ) + where + updateStateWhenObjectAddedToPool :: + objectId -> + DecisionGlobalState peerAddr objectId object -> + DecisionGlobalState peerAddr objectId object + updateStateWhenObjectAddedToPool + objectId + st@DecisionGlobalState + { dgsPeerStates + } = + st + { dgsPeerStates = dgsPeerStates' + } + where + dgsPeerStates' = + Map.adjust + ( \ps@DecisionPeerState{dpsObjectsOwtPool} -> ps{dpsObjectsOwtPool = Map.delete objectId dpsObjectsOwtPool} + ) + peerAddr + dgsPeerStates diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Types.hs new file mode 100644 index 0000000000..3cfdef80da --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/V2/Types.hs @@ -0,0 +1,348 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types + ( -- * DecisionPeerState + DecisionPeerState (..) + + -- * DecisionGlobalState + , DecisionGlobalState (..) + , dgsObjectsAvailableMultiplicities + , dgsObjectsInflightMultiplicities + , dgsObjectsOwtPoolMultiplicities + , DecisionGlobalStateVar + , newDecisionGlobalStateVar + + -- * Decisions + , PeerDecision (..) + , PeerDecisionStatus (..) + , unavailableDecision + + -- * Tracing + , mempty + , TraceDecisionLogic (..) + , ObjectMultiplicity (..) + + -- * Reporting + , ObjectDiffusionCounters (..) + , makeObjectDiffusionCounters + + -- * Copied from V1 + , NumObjectsProcessed (..) + , TraceObjectDiffusionInbound (..) + , ObjectDiffusionInboundError (..) + + -- * Object pool semaphore + , ObjectPoolSem (..) + , newObjectPoolSem + ) where + +import Control.Concurrent.Class.MonadSTM.Strict (MonadSTM, StrictTVar, atomically, newTVarIO) +import Control.Concurrent.Class.MonadSTM.TSem (TSem, newTSem) +import Control.DeepSeq (NFData (..)) +import Control.Exception (Exception (..)) +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.Monoid (Sum (..)) +import Data.Sequence.Strict (StrictSeq, fromList) +import Data.Set (Set) +import Data.Word (Word64) +import GHC.Generics (Generic) +import GHC.Stack (HasCallStack) +import NoThunks.Class (NoThunks (..)) +import Ouroboros.Network.ControlMessage (ControlMessage) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type +import Quiet (Quiet (..)) +import Test.QuickCheck (Arbitrary (..), elements) + +-- | Semaphore to guard access to the ObjectPool +newtype ObjectPoolSem m = ObjectPoolSem (TSem m) + +newObjectPoolSem :: MonadSTM m => m (ObjectPoolSem m) +newObjectPoolSem = ObjectPoolSem <$> atomically (newTSem 1) + +-- +-- DecisionPeerState, DecisionGlobalState +-- + +-- | In all the fields' names, +-- If "Ids" appears at the beginning of a name field, it means we refer to IDs +-- specifically (i.e. before the corresponding object is in flight). +-- On the other hand, a field name of the form "Objects...Ids" means we are +-- speaking of objects (i.e. after they have been requested) but identify them +-- by their IDs for this field purpose. +data DecisionPeerState objectId object = DecisionPeerState + { dpsNumIdsInflight :: !NumObjectIdsReq + -- ^ The number of object identifiers that we have requested but + -- which have not yet been replied to. We need to track this it keep + -- our requests within the limit on the number of unacknowledged objectIds. + , dpsOutstandingFifo :: !(StrictSeq objectId) + -- ^ Those objects (by their identifier) that the client has told + -- us about, and which we have not yet acknowledged. This is kept in + -- the order in which the client gave them to us. This is the same order + -- in which we submit them to the objectpool. It is also the order + -- in which we acknowledge them. + , dpsObjectsAvailableIds :: !(Set objectId) + -- ^ Set of known object ids which can be requested from this peer. + , dpsObjectsInflightIds :: !(Set objectId) + -- ^ The set of requested objects (by their ids). + -- , dpsObjectsRequestedButNotReceivedIds :: !(Set objectId) + -- ^ A subset of `dpsOutstandingFifo` which were unknown to the peer + -- (i.e. requested but not received). We need to track these `objectId`s + -- since they need to be acknowledged. + , dpsObjectsOwtPool :: !(Map objectId object) + -- ^ A set of objects on their way to the objectpool. + -- Tracked here so that we can cleanup `dgsObjectsOwtPoolMultiplicities` if the + -- peer dies. + -- + -- Life cycle of entries: + -- * added by `acknowledgeObjectIds` (where decide which objects can be + -- submitted to the objectpool) + -- * removed by `withObjectPoolSem` + } + deriving stock (Show, Eq, Generic) + deriving anyclass (NFData, NoThunks) + +instance + ( Arbitrary objectId + , Arbitrary object + , Ord objectId + ) => + Arbitrary (DecisionPeerState objectId object) + where + arbitrary = + DecisionPeerState + <$> (NumObjectIdsReq <$> arbitrary) + <*> (fromList <$> arbitrary) + <*> arbitrary + <*> arbitrary + <*> arbitrary + +-- | Shared state of all `ObjectDiffusion` clients. +data DecisionGlobalState peerAddr objectId object = DecisionGlobalState + { dgsPeerStates :: !(Map peerAddr (DecisionPeerState objectId object)) + -- ^ Map of peer states. + -- + -- /Invariant:/ for peerAddr's which are registered using `withPeer`, + -- there's always an entry in this map even if the set of `objectId`s is + -- empty. + } + deriving stock (Show, Eq, Generic) + deriving anyclass (NFData, NoThunks) + +instance + ( Arbitrary peerAddr + , Arbitrary object + , Arbitrary objectId + , Ord peerAddr + , Ord objectId + ) => + Arbitrary (DecisionGlobalState peerAddr objectId object) + where + arbitrary = DecisionGlobalState <$> arbitrary + +-- | Merge dpsObjectsAvailableIds from all peers of the global state. +dgsObjectsAvailableMultiplicities :: + Ord objectId => DecisionGlobalState peerAddr objectId object -> Map objectId ObjectMultiplicity +dgsObjectsAvailableMultiplicities DecisionGlobalState{dgsPeerStates} = + Map.unionsWith + (+) + (Map.fromSet (const 1) . dpsObjectsAvailableIds <$> Map.elems dgsPeerStates) + +dgsObjectsInflightMultiplicities :: + Ord objectId => DecisionGlobalState peerAddr objectId object -> Map objectId ObjectMultiplicity +dgsObjectsInflightMultiplicities DecisionGlobalState{dgsPeerStates} = + Map.unionsWith + (+) + (Map.fromSet (const 1) . dpsObjectsInflightIds <$> Map.elems dgsPeerStates) + +dgsObjectsOwtPoolMultiplicities :: + Ord objectId => DecisionGlobalState peerAddr objectId object -> Map objectId ObjectMultiplicity +dgsObjectsOwtPoolMultiplicities DecisionGlobalState{dgsPeerStates} = + Map.unionsWith + (+) + (Map.fromSet (const 1) . Map.keysSet . dpsObjectsOwtPool <$> Map.elems dgsPeerStates) + +type DecisionGlobalStateVar m peerAddr objectId object = + StrictTVar m (DecisionGlobalState peerAddr objectId object) + +newDecisionGlobalStateVar :: + MonadSTM m => + m (DecisionGlobalStateVar m peerAddr objectId object) +newDecisionGlobalStateVar = + newTVarIO + DecisionGlobalState + { dgsPeerStates = Map.empty + } + +-- +-- Decisions +-- + +-- | Decision made by the decision logic. Each peer will receive a 'Decision'. +-- +-- /note:/ it is rather non-standard to represent a choice between requesting +-- `objectId`s and `object`'s as a product rather than a sum type. The client will +-- need to download `object`s first and then send a request for more objectIds (and +-- acknowledge some `objectId`s). Due to pipelining each client will request +-- decision from the decision logic quite often (every two pipelined requests). +-- +-- TODO: in the previous design, we prefiltered active peers before calling +-- `makeDecision`, so that a decision once taken would make the peer non-active +-- (e.g. it won't be returned by `filterActivePeers`) for longer, and thus the +-- expensive `makeDecision` computation would not need to take that peer into +-- account. This is no longer the case, but we could reintroduce this optimization +-- if needed. +data PeerDecision objectId object = PeerDecision + { pdNumIdsToAck :: !NumObjectIdsAck + -- ^ objectId's to acknowledge + , pdNumIdsToReq :: !NumObjectIdsReq + -- ^ number of objectId's to request + , pdCanPipelineIdsRequests :: !Bool + -- ^ the object-submission protocol only allows to pipeline `objectId`'s requests + -- if we have non-acknowledged `objectId`s. + , pdObjectsToReqIds :: !(Set objectId) + -- ^ objectId's to download. + , pdStatus :: !PeerDecisionStatus + -- ^ Whether the peer is actually executing the said decision + } + deriving stock (Show, Eq, Generic) + deriving anyclass (NFData, NoThunks) + +instance + ( Arbitrary objectId + , Ord objectId + ) => + Arbitrary (PeerDecision objectId object) + where + arbitrary = + PeerDecision + <$> (NumObjectIdsAck <$> arbitrary) + <*> (NumObjectIdsReq <$> arbitrary) + <*> arbitrary + <*> arbitrary + <*> arbitrary + +data PeerDecisionStatus + = DecisionUnread + | DecisionBeingActedUpon + | DecisionCompleted + deriving stock (Show, Eq, Generic) + deriving anyclass (NFData, NoThunks) + +instance Arbitrary PeerDecisionStatus where + arbitrary = + elements + [ DecisionUnread + , DecisionBeingActedUpon + , DecisionCompleted + ] + +-- | A placeholder when no decision has been made, at the beginning of a loop. +-- Nothing should be read from it except its status. +unavailableDecision :: HasCallStack => PeerDecision objectId object +unavailableDecision = + PeerDecision + { pdStatus = DecisionCompleted + , pdObjectsToReqIds = error "This decision is not available yet" + , pdNumIdsToAck = error "This decision is not available yet" + , pdNumIdsToReq = error "This decision is not available yet" + , pdCanPipelineIdsRequests = error "This decision is not available yet" + } + +-- | ObjectLogic tracer. +data TraceDecisionLogic peerAddr objectId object + = TraceDecisionLogicGlobalStateUpdated String (DecisionGlobalState peerAddr objectId object) + | TraceDecisionLogicDecisionsMade (Map peerAddr (PeerDecision objectId object)) + deriving stock (Show, Eq, Generic) + +data ObjectDiffusionCounters + = ObjectDiffusionCounters + { odcNumDistinctObjectsAvailable :: Int + -- ^ objectIds which are not yet downloaded. + , odcNumDistinctObjectsInflight :: Int + -- ^ number of distinct in-flight objects. + , odcNumTotalObjectsInflight :: Int + -- ^ number of all in-flight objects. + , odcNumDistinctObjectsOwtPool :: Int + -- ^ number of distinct objects which are waiting to be added to the + -- objectpool (each peer need to acquire the semaphore to effectively add + -- them to the pool) + } + deriving stock (Show, Eq, Generic) + +makeObjectDiffusionCounters :: + Ord objectId => + DecisionGlobalState peerAddr objectId object -> + ObjectDiffusionCounters +makeObjectDiffusionCounters + dgs = + ObjectDiffusionCounters + { odcNumDistinctObjectsAvailable = Map.size $ dgsObjectsAvailableMultiplicities dgs + , odcNumDistinctObjectsInflight = Map.size $ dgsObjectsInflightMultiplicities dgs + , odcNumTotalObjectsInflight = + fromIntegral . mconcat . Map.elems $ dgsObjectsInflightMultiplicities dgs + , odcNumDistinctObjectsOwtPool = Map.size $ dgsObjectsOwtPoolMultiplicities dgs + } + +-- Copied from V1: + +newtype NumObjectsProcessed + = NumObjectsProcessed + { getNumObjectsProcessed :: Word64 + } + deriving stock (Eq, Ord, Generic) + deriving newtype (NFData, NoThunks, Num, Enum, Real, Integral, Bounded) + deriving Semigroup via (Sum Word64) + deriving Monoid via (Sum Word64) + deriving Show via (Quiet NumObjectsProcessed) + +newtype ObjectMultiplicity + = ObjectMultiplicity + { getObjectMultiplicity :: Word64 + } + deriving stock (Eq, Ord, Generic) + deriving newtype (NFData, NoThunks, Num, Enum, Real, Integral, Bounded) + deriving Semigroup via (Sum Word64) + deriving Monoid via (Sum Word64) + deriving Show via (Quiet ObjectMultiplicity) + +data TraceObjectDiffusionInbound objectId object + = TraceObjectDiffusionInboundRequestedIds Int + | TraceObjectDiffusionInboundRequestedObjects Int + | TraceObjectDiffusionInboundReceivedIds Int + | TraceObjectDiffusionInboundReceivedObjects Int + | TraceObjectDiffusionInboundAddedObjects Int + | -- | Received a 'ControlMessage' from the outbound peer governor, and about + -- to act on it. + TraceObjectDiffusionInboundReceivedControlMessage ControlMessage + | TraceObjectDiffusionInboundReceivedDecision (PeerDecision objectId object) + deriving stock (Show, Eq, Generic) + +data ObjectDiffusionInboundError + = ProtocolErrorObjectNotRequested + | ProtocolErrorObjectIdsNotRequested + | ProtocolErrorObjectIdAlreadyKnown + | ProtocolErrorObjectIdsDuplicate + | ProtocolErrorObjectMissing + deriving stock (Show, Eq, Generic) + +instance Exception ObjectDiffusionInboundError where + displayException ProtocolErrorObjectNotRequested = + "The peer replied with an object we did not ask for." + displayException ProtocolErrorObjectIdsNotRequested = + "The peer replied with more objectIds than we asked for." + displayException ProtocolErrorObjectIdAlreadyKnown = + "The peer replied with an objectId that it has already sent us previously." + displayException ProtocolErrorObjectIdsDuplicate = + "The peer replied with a batch of objectIds containing a duplicate." + displayException ProtocolErrorObjectMissing = + "The peer did not deliver an object for which it claimed to have an id." diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs index 2f949d8b3b..37c37743e5 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs @@ -26,11 +26,15 @@ module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API ( ObjectPoolReader (..) , ObjectPoolWriter (..) + , SizeInBytes -- TODO: remove ) where import Control.Concurrent.Class.MonadSTM.Strict (STM) +import Data.Void (Void) -- TODO: remove import Data.Word (Word64) +type SizeInBytes = Void -- TODO: remove + -- | Interface used by the outbound side of object diffusion as its source of -- objects to give to the remote side. data ObjectPoolReader objectId object ticketNo m diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs index 5c024618b0..c86cef1707 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs @@ -14,8 +14,8 @@ module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert ) where import Ouroboros.Consensus.Block -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound import Ouroboros.Consensus.Storage.PerasCertDB.API diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs index 8e12f01d6d..3553b8cc68 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs @@ -28,10 +28,10 @@ import Network.TypedProtocol.Channel (Channel, createConnectedChannels) import Network.TypedProtocol.Codec (AnyMessage) import Network.TypedProtocol.Driver.Simple (runPeer, runPipelinedPeer) import NoThunks.Class (NoThunks) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 ( objectDiffusionInbound ) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State ( ObjectDiffusionInboundStateView (..) ) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API