diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index 3db1710ae1..8f4159e77e 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -63,6 +63,7 @@ library Ouroboros.Consensus.Node.Exit Ouroboros.Consensus.Node.ExitPolicy Ouroboros.Consensus.Node.GSM + Ouroboros.Consensus.Node.GSM.PeerState Ouroboros.Consensus.Node.Genesis Ouroboros.Consensus.Node.Recovery Ouroboros.Consensus.Node.RethrowPolicy @@ -97,8 +98,10 @@ library random, resource-registry ^>=0.1, safe-wild-cards ^>=1.0, + semialign, serialise ^>=0.2, text, + these, time, transformers, typed-protocols:{stateful, typed-protocols}, diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index f6e2d2e4ba..8b8f27b7b2 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -71,6 +71,10 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient import Ouroboros.Consensus.MiniProtocol.ChainSync.Server import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State + ( ObjectDiffusionInboundStateView + , bracketObjectDiffusionInbound + ) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert @@ -214,6 +218,7 @@ data Handlers m addr blk = Handlers , hPerasCertDiffusionClient :: NodeToNodeVersion -> ControlMessageSTM m -> + ObjectDiffusionInboundStateView m -> ConnectionId addr -> PerasCertDiffusionInboundPipelined blk m () , hPerasCertDiffusionServer :: @@ -316,7 +321,7 @@ mkHandlers (mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool) (getMempoolWriter getMempool) version - , hPerasCertDiffusionClient = \version controlMessageSTM peer -> + , hPerasCertDiffusionClient = \version controlMessageSTM state peer -> objectDiffusionInbound (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers)) ( perasCertDiffusionMaxFifoLength miniProtocolParameters @@ -326,6 +331,7 @@ mkHandlers (makePerasCertPoolWriterFromChainDB $ getChainDB) version controlMessageSTM + state , hPerasCertDiffusionServer = \version peer -> objectDiffusionOutbound (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers)) @@ -864,17 +870,22 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke } channel = do labelThisThread "PerasCertDiffusionClient" - ((), trailing) <- - runPipelinedPeerWithLimits - (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) - (cPerasCertDiffusionCodec (mkCodecs version)) - blPerasCertDiffusion - timeLimitsObjectDiffusion - channel - ( objectDiffusionInboundPeerPipelined - (hPerasCertDiffusionClient version controlMessageSTM them) - ) - return (NoInitiatorResult, trailing) + bracketObjectDiffusionInbound + version + (getPerasCertDiffusionHandles kernel) + them + $ \state -> do + ((), trailing) <- + runPipelinedPeerWithLimits + (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) + (cPerasCertDiffusionCodec (mkCodecs version)) + blPerasCertDiffusion + timeLimitsObjectDiffusion + channel + ( objectDiffusionInboundPeerPipelined + (hPerasCertDiffusionClient version controlMessageSTM state them) + ) + return (NoInitiatorResult, trailing) aPerasCertDiffusionServer :: NodeToNodeVersion -> diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs index 780602118b..6608ade58c 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs @@ -56,8 +56,6 @@ import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry import qualified Ouroboros.Consensus.Ledger.Basics as L import Ouroboros.Consensus.Node.GsmState import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) -import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar) -import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM import System.FS.API ( HasFS , createDirectoryIfMissing @@ -97,7 +95,7 @@ data CandidateVersusSelection WhetherCandidateIsBetter !Bool deriving (Eq, Show) -data GsmView m upstreamPeer selection chainSyncState = GsmView +data GsmView m upstreamPeer selection peerState = GsmView { antiThunderingHerd :: Maybe StdGen -- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up -- to 15% every transition from Syncing to CaughtUp, in order to avoid a @@ -108,13 +106,13 @@ data GsmView m upstreamPeer selection chainSyncState = GsmView STM m ( selection -> - chainSyncState -> + peerState -> CandidateVersusSelection ) -- ^ Whether the candidate from the @chainSyncState@ is preferable to the -- selection. This can depend on external state (Peras certificates boosting -- blocks). - , peerIsIdle :: chainSyncState -> Bool + , peerIsIdle :: peerState -> Bool , durationUntilTooOld :: Maybe (selection -> m DurationFromNow) -- ^ How long from now until the selection will be so old that the node -- should exit the @CaughtUp@ state @@ -123,10 +121,8 @@ data GsmView m upstreamPeer selection chainSyncState = GsmView , equivalent :: selection -> selection -> Bool -- ^ Whether the two selections are equivalent for the purpose of the -- Genesis State Machine - , getChainSyncStates :: - STM m (Map.Map upstreamPeer (StrictTVar m chainSyncState)) - -- ^ The current ChainSync state with the latest candidates from the - -- upstream peers + , getPeerStates :: STM m (Map.Map upstreamPeer peerState) + -- ^ The current peer state with the latest candidates from the upstream peers , getCurrentSelection :: STM m selection -- ^ The node's current selection , minCaughtUpDuration :: NominalDiffTime @@ -244,7 +240,7 @@ realGsmEntryPoints tracerArgs gsmView = , peerIsIdle , durationUntilTooOld , equivalent - , getChainSyncStates + , getPeerStates , getCurrentSelection , minCaughtUpDuration , setCaughtUpPersistentMark @@ -370,12 +366,13 @@ realGsmEntryPoints tracerArgs gsmView = blockUntilCaughtUp :: STM m (TraceGsmEvent tracedSelection) blockUntilCaughtUp = do - -- STAGE 1: all ChainSync clients report no subsequent headers - varsState <- getChainSyncStates - states <- traverse StrictSTM.readTVar varsState + -- STAGE 1: all peers are idle, which means that + -- * all ChainSync clients report no subsequent headers, and + -- * all PerasCertDiffusion clients report no subsequent certificates + peerStates <- getPeerStates check $ - not (Map.null states) - && all peerIsIdle states + not (Map.null peerStates) + && all peerIsIdle peerStates -- STAGE 2: no candidate is better than the node's current -- selection @@ -388,16 +385,15 @@ realGsmEntryPoints tracerArgs gsmView = -- block; general Praos reasoning ensures that won't take particularly -- long. selection <- getCurrentSelection - candidates <- traverse StrictSTM.readTVar varsState candidateOverSelection <- getCandidateOverSelection let ok candidate = WhetherCandidateIsBetter False == candidateOverSelection selection candidate - check $ all ok candidates + check $ all ok peerStates pure $ GsmEventEnterCaughtUp - (Map.size states) + (Map.size peerStates) (cnvSelection selection) -- STAGE 3: the previous stages weren't so slow that the idler diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM/PeerState.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM/PeerState.hs new file mode 100644 index 0000000000..defc3abe33 --- /dev/null +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM/PeerState.hs @@ -0,0 +1,78 @@ +module Ouroboros.Consensus.Node.GSM.PeerState + ( GsmPeerState (..) + , maybeChainSyncState + , maybePerasCertDiffusionState + , mkGsmPeerStates + , gsmPeerIsIdle + ) +where + +import Cardano.Base.FeatureFlags (CardanoFeatureFlag (..)) +import Data.Align (Semialign (..)) +import Data.Map.Strict (Map) +import Data.Set (Set) +import Data.These (These (..)) +import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State + ( ChainSyncClientHandle (..) + , ChainSyncClientHandleCollection (..) + , ChainSyncState (..) + ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State + ( ObjectDiffusionInboundHandle (..) + , ObjectDiffusionInboundHandleCollection (..) + , ObjectDiffusionInboundState (..) + ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert (PerasCertDiffusionInboundState) +import Ouroboros.Consensus.Util.IOLike (MonadSTM (..), readTVar) +import Ouroboros.Network.NodeToNode.Version (isPerasEnabled) + +-- | State about peers we are connected to during initialization. +newtype GsmPeerState blk = GsmPeerState + { unGsmPeerState :: + These + (ChainSyncState blk) + (PerasCertDiffusionInboundState blk) + } + +-- | Retrieve the 'ChainSync' state of this peer, if such a connection is established. +maybeChainSyncState :: GsmPeerState blk -> Maybe (ChainSyncState blk) +maybeChainSyncState (GsmPeerState these) = + case these of + This csState -> Just csState + That _ -> Nothing + These csState _ -> Just csState + +-- | Retrieve the 'PerasCertDiffusion' state of this peer, if such a connection is established. +maybePerasCertDiffusionState :: GsmPeerState blk -> Maybe (PerasCertDiffusionInboundState blk) +maybePerasCertDiffusionState (GsmPeerState these) = + case these of + This _ -> Nothing + That pcdState -> Just pcdState + These _ pcdState -> Just pcdState + +-- | Construct a 'GsmPeerState' for all peers we are connected to. +mkGsmPeerStates :: + (Ord peer, MonadSTM m) => + ChainSyncClientHandleCollection peer m blk -> + ObjectDiffusionInboundHandleCollection peer m blk -> + STM m (Map peer (GsmPeerState blk)) +mkGsmPeerStates csHandles pcdHandles = do + csPeerStates <- traverse (readTVar . cschState) =<< cschcMap csHandles + pcdPeerStates <- traverse (readTVar . odihState) =<< odihcMap pcdHandles + pure (GsmPeerState <$> align csPeerStates pcdPeerStates) + +-- | Determine whether our connections to this peer are idle. +gsmPeerIsIdle :: Set CardanoFeatureFlag -> GsmPeerState blk -> Bool +gsmPeerIsIdle featureFlags (GsmPeerState these) = + case these of + -- We have both ChainSync and PerasCertDiffusion connections => idle if both are idling + These csState pcdState -> csIdling csState && odisIdling pcdState + -- Only a ChainSync connection is available => idle if the ChainSync connection is idling + This csState | not (perasIsEnabled csState) -> csIdling csState + -- We will soon establish a PerasCertDiffusion connection => not idling + This _ -> False + -- We will soon establish a ChainSync connection => not idling + That _ -> False + where + -- Is the Peras feature flag enabled and the peer is compatible with it? + perasIsEnabled csState = isPerasEnabled featureFlags (csNodeToNodeVersion csState) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index f529392bcf..cff137e5a6 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -27,7 +27,7 @@ module Ouroboros.Consensus.NodeKernel , toConsensusMode ) where -import Cardano.Base.FeatureFlags (CardanoFeatureFlag) +import Cardano.Base.FeatureFlags (CardanoFeatureFlag (..)) import Cardano.Network.ConsensusMode (ConsensusMode (..)) import Cardano.Network.PeerSelection.Bootstrap (UseBootstrapPeers) import Cardano.Network.PeerSelection.LocalRootPeers @@ -50,7 +50,7 @@ import Data.Functor ((<&>)) import Data.Hashable (Hashable) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as NE -import Data.Maybe (isJust, mapMaybe) +import Data.Maybe (isJust, isNothing, mapMaybe) import Data.Proxy import Data.Set (Set) import qualified Data.Text as Text @@ -82,8 +82,16 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck ( SomeHeaderInFutureCheck ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State + ( ObjectDiffusionInboundHandleCollection (..) + , newObjectDiffusionInboundHandleCollection + ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert + ( PerasCertDiffusionInboundHandleCollection + ) import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..)) import qualified Ouroboros.Consensus.Node.GSM as GSM +import Ouroboros.Consensus.Node.GSM.PeerState (gsmPeerIsIdle, maybeChainSyncState, mkGsmPeerStates) import Ouroboros.Consensus.Node.Genesis ( GenesisNodeKernelArgs (..) , LoEAndGDDConfig (..) @@ -175,6 +183,9 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel -- from it with 'GSM.gsmStateToLedgerJudgement'. , getChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk -- ^ The kill handle and exposed state for each ChainSync client. + , getPerasCertDiffusionHandles :: + ObjectDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk + -- ^ The exposed state for each Peras CertDiffusion client. , getPeerSharingRegistry :: PeerSharingRegistry addrNTN m -- ^ Read the current peer sharing registry, used for interacting with -- the PeerSharing protocol @@ -235,6 +246,7 @@ initNodeKernel args@NodeKernelArgs { registry , cfg + , featureFlags , tracers , chainDB , initChainDB @@ -257,6 +269,7 @@ initNodeKernel , mempool , peerSharingRegistry , varChainSyncHandles + , varPerasCertDiffusionHandles , varGsmState } = st @@ -275,24 +288,34 @@ initNodeKernel GSM.GsmView { GSM.antiThunderingHerd = Just gsmAntiThunderingHerd , GSM.getCandidateOverSelection = do - weights <- ChainDB.getPerasWeightSnapshot chainDB - pure $ \(headers, _lst) state -> - case AF.intersectionPoint headers (csCandidate state) of - Nothing -> GSM.CandidateDoesNotIntersect - Just{} -> - GSM.WhetherCandidateIsBetter $ -- precondition requires intersection - preferAnchoredCandidate - (configBlock cfg) - (forgetFingerprint weights) - headers - (csCandidate state) - , GSM.peerIsIdle = csIdling + weights <- forgetFingerprint <$> ChainDB.getPerasWeightSnapshot chainDB + pure $ \(headers, _lst) peerState -> do + case csCandidate <$> maybeChainSyncState peerState of + Just candidate + -- The candidate does not intersect with our current chain. + -- This is a precondition for 'WhetherCandidateIsBetter'. + | isNothing (AF.intersectionPoint headers candidate) -> + GSM.CandidateDoesNotIntersect + -- The candidate is better than our current chain. + | preferAnchoredCandidate (configBlock cfg) weights headers candidate -> + GSM.WhetherCandidateIsBetter True + -- The candidate is not better than our current chain. + | otherwise -> + GSM.WhetherCandidateIsBetter False + Nothing -> + -- We don't have an established ChainSync connection with this peer. + -- We conservatively assume that its candidate is not better than ours. + GSM.WhetherCandidateIsBetter False + , GSM.peerIsIdle = gsmPeerIsIdle featureFlags , GSM.durationUntilTooOld = gsmDurationUntilTooOld <&> \wd (_headers, lst) -> GSM.getDurationUntilTooOld wd (getTipSlot lst) , GSM.equivalent = (==) `on` (AF.headPoint . fst) - , GSM.getChainSyncStates = fmap cschState <$> cschcMap varChainSyncHandles + , GSM.getPeerStates = + mkGsmPeerStates + varChainSyncHandles + varPerasCertDiffusionHandles , GSM.getCurrentSelection = do headers <- ChainDB.getCurrentChainWithTime chainDB extLedgerState <- ChainDB.getCurrentLedger chainDB @@ -369,6 +392,7 @@ initNodeKernel , getFetchMode = readFetchMode blockFetchInterface , getGsmState = readTVar varGsmState , getChainSyncHandles = varChainSyncHandles + , getPerasCertDiffusionHandles = varPerasCertDiffusionHandles , getPeerSharingRegistry = peerSharingRegistry , getTracers = tracers , setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a @@ -419,6 +443,8 @@ data InternalState m addrNTN addrNTC blk = IS BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m , fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (HeaderWithTime blk) blk m , varChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk + , varPerasCertDiffusionHandles :: + PerasCertDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk , varGsmState :: StrictTVar m GSM.GsmState , mempool :: Mempool m blk , peerSharingRegistry :: PeerSharingRegistry addrNTN m @@ -457,6 +483,8 @@ initInternalState newTVarIO gsmState varChainSyncHandles <- atomically newChainSyncClientHandleCollection + varPerasCertDiffusionHandles <- atomically newObjectDiffusionInboundHandleCollection + mempool <- openMempool registry diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs index 44a57f4c32..8941958814 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs @@ -142,7 +142,7 @@ setupGsm isHaaSatisfied vars = do , GSM.peerIsIdle = isIdling , GSM.durationUntilTooOld = Just durationUntilTooOld , GSM.equivalent = (==) -- unsound, but harmless in this test - , GSM.getChainSyncStates = readTVar varStates + , GSM.getPeerStates = traverse readTVar =<< readTVar varStates , GSM.getCurrentSelection = readTVar varSelection , GSM.minCaughtUpDuration = thrashLimit , GSM.setCaughtUpPersistentMark = \b -> diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/DensityDisconnect.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/DensityDisconnect.hs index bacebe644f..eeb39af6f7 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/DensityDisconnect.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/DensityDisconnect.hs @@ -192,6 +192,7 @@ prop_densityDisconnectStatic = { csCandidate = frag , csLatestSlot = SJust (AF.headSlot frag) , csIdling = False + , csNodeToNodeVersion = maxBound } gen = do gt <- genChains (QC.choose (1, 4)) @@ -431,6 +432,7 @@ evolveBranches EvolvingPeers{k, sgen, peers = initialPeers, fullTree} = { csCandidate = attachTimeUsingTestConfig csCandidate , csIdling = False , csLatestSlot = SJust (AF.headSlot csCandidate) + , csNodeToNodeVersion = maxBound } -- Run GDD. (loeFrag, suffixes) = diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/LoE/CaughtUp.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/LoE/CaughtUp.hs index a58923bd60..e87ca885ec 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/LoE/CaughtUp.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/LoE/CaughtUp.hs @@ -142,16 +142,15 @@ run = withRegistry \registry -> do -- Then, send C. atomically $ modifyTVar (cschState hdl) $ \s -> - ChainSyncState + s { csCandidate = csCandidate s AF.:> attachSlotTime cfg (getHeader blkC) , csLatestSlot = pure $ NotOrigin $ blockSlot blkC - , csIdling = csIdling s } addBlk blkC -- Finally, roll back to the initial fragment and idle. - atomically $ modifyTVar (cschState hdl) $ \_s -> - ChainSyncState + atomically $ modifyTVar (cschState hdl) $ \s -> + s { csCandidate = initialFrag , csLatestSlot = pure $ AF.headSlot initialFrag , csIdling = True @@ -169,7 +168,7 @@ run = withRegistry \registry -> do -- Finally, idle. atomically $ modifyTVar (cschState hdl) $ \s -> - ChainSyncState + s { csCandidate = csCandidate s , csLatestSlot = csLatestSlot s , csIdling = True @@ -223,6 +222,7 @@ mkTestChainSyncClientHandle frag = do { csCandidate = frag , csIdling = False , csLatestSlot = pure $ AF.headSlot frag + , csNodeToNodeVersion = maxBound } varJumping <- newTVar $ Disengaged DisengagedDone varJumpInfo <- newTVar Nothing @@ -283,7 +283,7 @@ mkGsmEntryPoints varChainSyncHandles chainDB writeGsmState = { GSM.getCandidateOverSelection = pure candidateOverSelection , GSM.peerIsIdle = csIdling , GSM.equivalent = (==) `on` AF.headPoint - , GSM.getChainSyncStates = fmap cschState <$> cschcMap varChainSyncHandles + , GSM.getPeerStates = traverse readTVar =<< fmap cschState <$> cschcMap varChainSyncHandles , GSM.getCurrentSelection = ChainDB.getCurrentChain chainDB , -- Make sure that we stay in CaughtUp for the duration of the test once we -- have entered it. diff --git a/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs b/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs index 70854581a8..9d983608d2 100644 --- a/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs +++ b/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs @@ -37,6 +37,7 @@ import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck import Ouroboros.Consensus.MiniProtocol.ChainSync.Server ( chainSyncServerForFollower ) +import qualified Ouroboros.Consensus.MiniProtocol.Util.Idling as Idling import Ouroboros.Consensus.Node.NetworkProtocolVersion ( NodeToNodeVersion ) @@ -158,7 +159,7 @@ oneBenchRun , CSClient.headerMetricsTracer = nullTracer , CSClient.setCandidate = writeTVar varCandidate , CSClient.setLatestSlot = \_ -> pure () - , CSClient.idling = CSClient.noIdling + , CSClient.idling = Idling.noIdling , CSClient.loPBucket = CSClient.noLoPBucket , CSClient.jumping = CSClient.noJumping } diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index e67214f5c7..acbca582c1 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -192,10 +192,13 @@ library Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert + Ouroboros.Consensus.MiniProtocol.Util + Ouroboros.Consensus.MiniProtocol.Util.Idling Ouroboros.Consensus.Node.GsmState Ouroboros.Consensus.Node.InitStorage Ouroboros.Consensus.Node.NetworkProtocolVersion diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index fcb0e25388..85b0b1a487 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -73,7 +73,6 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client , Jumping.noJumping , chainSyncStateFor , newChainSyncClientHandleCollection - , noIdling , noLoPBucket , viewChainSyncState ) where @@ -122,6 +121,7 @@ import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCh import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as Jumping import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State +import Ouroboros.Consensus.MiniProtocol.Util.Idling (Idling (..)) import Ouroboros.Consensus.Node.GsmState (GsmState (..)) import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Peras.Weight (emptyPerasWeightSnapshot) @@ -272,26 +272,6 @@ chainSyncStateFor :: chainSyncStateFor varHandles peer = readTVar . cschState . (Map.! peer) =<< readTVar varHandles --- | Interface for the ChainSync client to manipulate the idling flag in --- 'ChainSyncState'. -data Idling m = Idling - { idlingStart :: !(m ()) - -- ^ Mark the peer as being idle. - , idlingStop :: !(m ()) - -- ^ Mark the peer as not being idle. - } - deriving stock Generic - -deriving anyclass instance IOLike m => NoThunks (Idling m) - --- | No-op implementation, for tests. -noIdling :: Applicative m => Idling m -noIdling = - Idling - { idlingStart = pure () - , idlingStop = pure () - } - -- | Interface to the LoP implementation for the ChainSync client. data LoPBucket m = LoPBucket { lbPause :: !(m ()) @@ -405,6 +385,7 @@ bracketChainSyncClient { csCandidate = AF.Empty AF.AnchorGenesis , csLatestSlot = SNothing , csIdling = False + , csNodeToNodeVersion = version } withCSJCallbacks :: diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs index d7dd82db7b..7077aba1b4 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs @@ -37,6 +37,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol ( LedgerSupportsProtocol ) import Ouroboros.Consensus.Node.GsmState (GsmState) +import Ouroboros.Consensus.Node.NetworkProtocolVersion (NodeToNodeVersion) import Ouroboros.Consensus.Util.IOLike ( IOLike , NoThunks (..) @@ -74,6 +75,11 @@ data ChainSyncState blk = ChainSyncState -- processing it further, and the latest slot may refer to a header beyond -- the forecast horizon while the candidate fragment isn't extended yet, to -- signal to GDD that the density is known up to this slot. + , csNodeToNodeVersion :: !NodeToNodeVersion + -- ^ Negotiated version of the protocol with the peer. + -- + -- This is used to determine later on whether other mini-protocols are + -- expected to run in parallel with this one. } deriving stock Generic diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs index bba2d07cb0..a368682c40 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs @@ -38,7 +38,11 @@ import Data.Word (Word64) import GHC.Generics (Generic) import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt) import NoThunks.Class (NoThunks (..), unsafeNoThunks) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State + ( ObjectDiffusionInboundStateView (..) + ) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.MiniProtocol.Util.Idling qualified as Idling import Ouroboros.Network.ControlMessage import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound @@ -62,6 +66,8 @@ data TraceObjectDiffusionInbound objectId object TraceObjectDiffusionControlMessage ControlMessage | TraceObjectInboundCanRequestMoreObjects Int | TraceObjectInboundCannotRequestMoreObjects Int + | TraceObjectInboundStartedIdling + | TraceObjectInboundStoppedIdling deriving (Eq, Show) data ObjectDiffusionInboundError @@ -131,13 +137,15 @@ objectDiffusionInbound :: ObjectPoolWriter objectId object m -> NodeToNodeVersion -> ControlMessageSTM m -> + ObjectDiffusionInboundStateView m -> ObjectDiffusionInboundPipelined objectId object m () objectDiffusionInbound tracer (maxFifoLength, maxNumIdsToReq, maxNumObjectsToReq) ObjectPoolWriter{..} _version - controlMessageSTM = + controlMessageSTM + state = ObjectDiffusionInboundPipelined $ do continueWithStateM (go Zero) initialInboundSt where @@ -242,6 +250,12 @@ objectDiffusionInbound -- objectIds. Since this is the only thing to do now, we make this a -- blocking call. traceWith tracer (TraceObjectInboundCannotRequestMoreObjects (natToInt n)) + -- Before blocking, signal to the protocol client that we are idling + -- + -- NOTE this change of state should be made explicit: + -- https://github.com/tweag/cardano-peras/issues/144 + Idling.idlingStart (odisvIdling state) + traceWith tracer TraceObjectInboundStartedIdling pure $ continueWithState goReqObjectIdsBlocking st -- We have pipelined some requests, so there are some replies in flight. @@ -378,7 +392,13 @@ objectDiffusionInbound $ SendMsgRequestObjectIdsBlocking (numToAckOnNextReq st) numIdsToRequest - ( \neCollectedIds -> + ( \neCollectedIds -> do + -- We just got some new object id's, so we are no longer idling + -- + -- NOTE this change of state should be made explicit: + -- https://github.com/tweag/cardano-peras/issues/144 + Idling.idlingStop (odisvIdling state) + traceWith tracer TraceObjectInboundStoppedIdling collectAndContinueWithState (goCollect Zero) st diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/State.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/State.hs new file mode 100644 index 0000000000..58402da64f --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound/State.hs @@ -0,0 +1,138 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE UndecidableInstances #-} + +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State + ( ObjectDiffusionInboundState (..) + , initObjectDiffusionInboundState + , ObjectDiffusionInboundHandle (..) + , ObjectDiffusionInboundHandleCollection (..) + , ObjectDiffusionInboundStateView (..) + , newObjectDiffusionInboundHandleCollection + , bracketObjectDiffusionInbound + ) +where + +import Control.Monad.Class.MonadThrow (bracket) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import GHC.Generics (Generic) +import NoThunks.Class (NoThunks) +import Ouroboros.Consensus.Block (BlockSupportsProtocol, HasHeader, Header) +import Ouroboros.Consensus.MiniProtocol.Util.Idling (Idling (..)) +import Ouroboros.Consensus.Node.NetworkProtocolVersion (NodeToNodeVersion) +import Ouroboros.Consensus.Util.IOLike + ( IOLike (..) + , MonadSTM (..) + , StrictTVar + , modifyTVar + , newTVar + , newTVarIO + , readTVar + ) + +-- | An ObjectDiffusion inbound client state that's used by other components. +-- +-- NOTE: 'blk' is not needed for now, but we keep it for future use. +data ObjectDiffusionInboundState blk = ObjectDiffusionInboundState + { odisIdling :: !Bool + -- ^ Whether the client is currently idling + , odisNodeToNodeVersion :: !NodeToNodeVersion + -- ^ Negotiated version of the protocol with the peer. + -- + -- This is used to determine later on whether other mini-protocols are + -- expected to run in parallel with this one. + } + deriving stock Generic + +deriving anyclass instance + ( HasHeader blk + , NoThunks (Header blk) + ) => + NoThunks (ObjectDiffusionInboundState blk) + +initObjectDiffusionInboundState :: NodeToNodeVersion -> ObjectDiffusionInboundState blk +initObjectDiffusionInboundState version = + ObjectDiffusionInboundState + { odisIdling = True + , odisNodeToNodeVersion = version + } + +-- | An interface to an ObjectDiffusion inbound client that's used by other components. +data ObjectDiffusionInboundHandle m blk = ObjectDiffusionInboundHandle + { odihState :: !(StrictTVar m (ObjectDiffusionInboundState blk)) + -- ^ Data shared between the client and external components. + } + deriving stock Generic + +deriving anyclass instance + ( IOLike m + , HasHeader blk + , NoThunks (Header blk) + ) => + NoThunks (ObjectDiffusionInboundHandle m blk) + +-- | A collection of ObjectDiffusion inbound client handles for the peers of this node. +data ObjectDiffusionInboundHandleCollection peer m blk = ObjectDiffusionInboundHandleCollection + { odihcMap :: !(STM m (Map peer (ObjectDiffusionInboundHandle m blk))) + -- ^ A map containing the handles for the peers in the collection + , odihcAddHandle :: !(peer -> ObjectDiffusionInboundHandle m blk -> STM m ()) + -- ^ Add the handle for the given peer to the collection + , odihcRemoveHandle :: !(peer -> STM m ()) + -- ^ Remove the handle for the given peer from the collection + } + deriving stock Generic + +newObjectDiffusionInboundHandleCollection :: + (Ord peer, IOLike m, NoThunks peer, BlockSupportsProtocol blk) => + STM m (ObjectDiffusionInboundHandleCollection peer m blk) +newObjectDiffusionInboundHandleCollection = do + handlesMap <- newTVar mempty + return + ObjectDiffusionInboundHandleCollection + { odihcMap = readTVar handlesMap + , odihcAddHandle = \peer handle -> + modifyTVar handlesMap (Map.insert peer handle) + , odihcRemoveHandle = \peer -> + modifyTVar handlesMap (Map.delete peer) + } + +-- | Interface for the ObjectDiffusion client to its state allocated by +-- 'bracketObjectDiffusionInbound'. +data ObjectDiffusionInboundStateView m = ObjectDiffusionInboundStateView + { odisvIdling :: !(Idling m) + } + deriving stock Generic + +bracketObjectDiffusionInbound :: + forall m peer blk a. + (IOLike m, HasHeader blk, NoThunks (Header blk)) => + NodeToNodeVersion -> + ObjectDiffusionInboundHandleCollection peer m blk -> + peer -> + (ObjectDiffusionInboundStateView m -> m a) -> + m a +bracketObjectDiffusionInbound version handles peer body = do + odiState <- newTVarIO (initObjectDiffusionInboundState version) + bracket (acquireContext odiState) releaseContext body + where + acquireContext odiState = atomically $ do + odihcAddHandle handles peer $ + ObjectDiffusionInboundHandle + { odihState = odiState + } + return + ObjectDiffusionInboundStateView + { odisvIdling = + Idling + { idlingStart = atomically $ modifyTVar odiState $ \s -> s{odisIdling = True} + , idlingStop = atomically $ modifyTVar odiState $ \s -> s{odisIdling = False} + } + } + + releaseContext _ = atomically $ do + odihcRemoveHandle handles peer diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs index ba0ba934a2..5c024618b0 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs @@ -8,10 +8,14 @@ module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert , PerasCertDiffusionInboundPipelined , PerasCertDiffusionOutbound , PerasCertDiffusion + , PerasCertDiffusionInboundState + , PerasCertDiffusionInboundHandle + , PerasCertDiffusionInboundHandleCollection ) where import Ouroboros.Consensus.Block import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound import Ouroboros.Consensus.Storage.PerasCertDB.API @@ -39,3 +43,12 @@ type PerasCertDiffusionOutbound blk m a = type PerasCertDiffusion blk = ObjectDiffusion PerasRoundNo (PerasCert blk) + +type PerasCertDiffusionInboundState blk = + ObjectDiffusionInboundState blk + +type PerasCertDiffusionInboundHandle m blk = + ObjectDiffusionInboundHandle m blk + +type PerasCertDiffusionInboundHandleCollection peer m blk = + ObjectDiffusionInboundHandleCollection peer m blk diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/Util.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/Util.hs new file mode 100644 index 0000000000..58fa7d2161 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/Util.hs @@ -0,0 +1,5 @@ +module Ouroboros.Consensus.MiniProtocol.Util + ( module Ouroboros.Consensus.MiniProtocol.Util.Idling + ) where + +import Ouroboros.Consensus.MiniProtocol.Util.Idling diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/Util/Idling.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/Util/Idling.hs new file mode 100644 index 0000000000..3962d26dd6 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/Util/Idling.hs @@ -0,0 +1,31 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE StandaloneDeriving #-} + +module Ouroboros.Consensus.MiniProtocol.Util.Idling + ( Idling (..) + , noIdling + ) where + +import GHC.Generics (Generic) +import Ouroboros.Consensus.Util.IOLike (IOLike, NoThunks) + +-- | Interface to manipulate the idling flag in the client state of a peer. +data Idling m = Idling + { idlingStart :: !(m ()) + -- ^ Mark the peer as being idle. + , idlingStop :: !(m ()) + -- ^ Mark the peer as not being idle. + } + deriving stock Generic + +deriving anyclass instance IOLike m => NoThunks (Idling m) + +-- | No-op implementation, for tests. +noIdling :: Applicative m => Idling m +noIdling = + Idling + { idlingStart = pure () + , idlingStop = pure () + } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs index a623d0b9a9..71bc19a5f8 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs @@ -31,6 +31,7 @@ import NoThunks.Class , OnlyCheckWhnfNamed (..) , allNoThunks ) +import Ouroboros.Consensus.Node.NetworkProtocolVersion (NodeToNodeVersion) import Ouroboros.Network.Util.ShowProxy import System.FS.API (SomeHasFS) import System.FS.API.Types (FsPath, Handle) @@ -85,6 +86,9 @@ instance NoThunks a => NoThunks (MultiSet a) where showTypeOf _ = "MultiSet" wNoThunks ctxt = wNoThunks ctxt . MultiSet.toMap +-- NOTE: fixed in https://github.com/IntersectMBO/ouroboros-network/pull/5214 +instance NoThunks NodeToNodeVersion + {------------------------------------------------------------------------------- fs-api -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs index d2f21c9b66..8e12f01d6d 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs @@ -31,11 +31,15 @@ import NoThunks.Class (NoThunks) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound ( objectDiffusionInbound ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State + ( ObjectDiffusionInboundStateView (..) + ) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API ( ObjectPoolReader (..) , ObjectPoolWriter (..) ) import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) +import qualified Ouroboros.Consensus.MiniProtocol.Util.Idling as Idling import Ouroboros.Consensus.Util.IOLike ( IOLike , MonadDelay (..) @@ -257,6 +261,11 @@ prop_smoke_object_diffusion controlMessage <- uncheckedNewTVarM Continue let + inboundState = + ObjectDiffusionInboundStateView + { odisvIdling = Idling.noIdling + } + inbound = objectDiffusionInbound tracer @@ -267,6 +276,7 @@ prop_smoke_object_diffusion inboundPoolWriter nodeToNodeVersion (readTVar controlMessage) + inboundState outbound = objectDiffusionOutbound