diff --git a/ouroboros-network-api/src/Ouroboros/Network/BlockFetch/ConsensusInterface.hs b/ouroboros-network-api/src/Ouroboros/Network/BlockFetch/ConsensusInterface.hs index c804383819c..4c5e9b28c0e 100644 --- a/ouroboros-network-api/src/Ouroboros/Network/BlockFetch/ConsensusInterface.hs +++ b/ouroboros-network-api/src/Ouroboros/Network/BlockFetch/ConsensusInterface.hs @@ -1,18 +1,24 @@ {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} module Ouroboros.Network.BlockFetch.ConsensusInterface ( FetchMode (..) , BlockFetchConsensusInterface (..) , WhetherReceivingTentativeBlocks (..) , FromConsensus (..) + , ChainSelStarvation (..) ) where import Control.Monad.Class.MonadSTM import Control.Monad.Class.MonadTime (UTCTime) +import Control.Monad.Class.MonadTime.SI (Time) import Data.Map.Strict (Map) import GHC.Stack (HasCallStack) +import GHC.Generics (Generic) +import NoThunks.Class (NoThunks) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import Ouroboros.Network.Block @@ -149,7 +155,17 @@ data BlockFetchConsensusInterface peer header block m = -- PRECONDITION: Same as 'headerForgeUTCTime'. -- -- WARNING: Same as 'headerForgeUTCTime'. - blockForgeUTCTime :: FromConsensus block -> STM m UTCTime + blockForgeUTCTime :: FromConsensus block -> STM m UTCTime, + + -- | Information on the ChainSel starvation status; whether it is ongoing + -- or has ended recently. Needed by the bulk sync decision logic. + readChainSelStarvation :: STM m ChainSelStarvation, + + -- | Action to inform CSJ that the given peer has not been performing + -- adequately with respect to BlockFetch, and that it should be demoted + -- from the dynamo role. Can be set to @const (pure ())@ in all other + -- scenarios. + demoteCSJDynamo :: peer -> m () } @@ -159,6 +175,16 @@ data WhetherReceivingTentativeBlocks = ReceivingTentativeBlocks | NotReceivingTentativeBlocks +-- | Whether ChainSel is starved or has been recently. +-- +-- The bulk sync fetch decision logic needs to decide whether the current +-- focused peer has starved ChainSel recently. This datatype is used to +-- represent this piece of information. +data ChainSelStarvation + = ChainSelStarvationOngoing + | ChainSelStarvationEndedAt Time + deriving (Eq, Show, NoThunks, Generic) + {------------------------------------------------------------------------------- Syntactic indicator of key precondition about Consensus time conversions -------------------------------------------------------------------------------} diff --git a/ouroboros-network/demo/chain-sync.hs b/ouroboros-network/demo/chain-sync.hs index 4c7acd3935a..963c5106e8d 100644 --- a/ouroboros-network/demo/chain-sync.hs +++ b/ouroboros-network/demo/chain-sync.hs @@ -31,6 +31,7 @@ import Control.Concurrent.Async import Control.Concurrent.Class.MonadSTM.Strict import Control.Exception import Control.Monad (when) +import Control.Monad.Class.MonadTime.SI (Time (..)) import Control.Tracer import System.Directory @@ -75,6 +76,7 @@ import Ouroboros.Network.Protocol.BlockFetch.Type qualified as BlockFetch import Ouroboros.Network.BlockFetch import Ouroboros.Network.BlockFetch.Client import Ouroboros.Network.BlockFetch.ClientRegistry (FetchClientRegistry (..)) +import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation(..)) import Ouroboros.Network.DeltaQ (defaultGSV) @@ -440,7 +442,10 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do blockMatchesHeader = \_ _ -> True, headerForgeUTCTime, - blockForgeUTCTime = headerForgeUTCTime . fmap blockHeader + blockForgeUTCTime = headerForgeUTCTime . fmap blockHeader, + + readChainSelStarvation = pure (ChainSelStarvationEndedAt (Time 0)), + demoteCSJDynamo = \_ -> pure () } where plausibleCandidateChain cur candidate = @@ -506,11 +511,14 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do blockFetchPolicy registry (BlockFetchConfiguration { - bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, bfcMaxRequestsInflight = 10, - bfcDecisionLoopInterval = 0.01, - bfcSalt = 0 + bfcDecisionLoopIntervalBulkSync = 0.04, + bfcDecisionLoopIntervalDeadline = 0.01, + bfcSalt = 0, + bfcGenesisBFConfig = GenesisBlockFetchConfiguration + { gbfcBulkSyncGracePeriod = 10 -- seconds + } }) >> return () diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index 12bf433f52d..694b89fdc09 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -33,6 +33,9 @@ library Ouroboros.Network.BlockFetch.ClientRegistry Ouroboros.Network.BlockFetch.ClientState Ouroboros.Network.BlockFetch.Decision + Ouroboros.Network.BlockFetch.Decision.BulkSync + Ouroboros.Network.BlockFetch.Decision.Deadline + Ouroboros.Network.BlockFetch.Decision.Trace Ouroboros.Network.BlockFetch.DeltaQ Ouroboros.Network.BlockFetch.State Ouroboros.Network.DeltaQ @@ -113,6 +116,7 @@ library cborg >=0.2.1 && <0.3, containers, deepseq, + dlist, dns, hashable, iproute, @@ -121,6 +125,7 @@ library nothunks, psqueues >=0.2.3 && <0.3, random, + transformers, cardano-prelude, cardano-slotting, @@ -337,6 +342,7 @@ executable demo-chain-sync contra-tracer, + si-timers, typed-protocols, strict-stm, ouroboros-network-api, diff --git a/ouroboros-network/sim-tests-lib/Ouroboros/Network/BlockFetch/Examples.hs b/ouroboros-network/sim-tests-lib/Ouroboros/Network/BlockFetch/Examples.hs index fb9c0bee85c..1225f13d47c 100644 --- a/ouroboros-network/sim-tests-lib/Ouroboros/Network/BlockFetch/Examples.hs +++ b/ouroboros-network/sim-tests-lib/Ouroboros/Network/BlockFetch/Examples.hs @@ -40,10 +40,12 @@ import Ouroboros.Network.Block import Network.TypedProtocol.Core import Network.TypedProtocol.Pipelined +import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.ControlMessage (ControlMessageSTM) import Ouroboros.Network.BlockFetch import Ouroboros.Network.BlockFetch.Client +import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation(..)) import Ouroboros.Network.Channel import Ouroboros.Network.DeltaQ import Ouroboros.Network.Driver @@ -55,6 +57,7 @@ import Ouroboros.Network.Protocol.BlockFetch.Type import Ouroboros.Network.Util.ShowProxy import Ouroboros.Network.Mock.ConcreteBlock +import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent) -- | Run a single block fetch protocol until the chain is downloaded. @@ -63,8 +66,7 @@ blockFetchExample0 :: forall m. (MonadSTM m, MonadST m, MonadAsync m, MonadDelay m, MonadFork m, MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m)) - => Tracer m [TraceLabelPeer Int - (FetchDecision [Point BlockHeader])] + => Tracer m (TraceDecisionEvent Int BlockHeader) -> Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader)) -> Tracer m (TraceLabelPeer Int @@ -134,11 +136,14 @@ blockFetchExample0 decisionTracer clientStateTracer clientMsgTracer (sampleBlockFetchPolicy1 headerForgeUTCTime blockHeap currentChainHeaders candidateChainHeaders) registry (BlockFetchConfiguration { - bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, bfcMaxRequestsInflight = 10, - bfcDecisionLoopInterval = 0.01, - bfcSalt = 0 + bfcDecisionLoopIntervalBulkSync = 0.04, + bfcDecisionLoopIntervalDeadline = 0.01, + bfcSalt = 0, + bfcGenesisBFConfig = GenesisBlockFetchConfiguration + { gbfcBulkSyncGracePeriod = 10 -- seconds + } }) >> return () @@ -172,8 +177,7 @@ blockFetchExample1 :: forall m. (MonadSTM m, MonadST m, MonadAsync m, MonadDelay m, MonadFork m, MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m)) - => Tracer m [TraceLabelPeer Int - (FetchDecision [Point BlockHeader])] + => Tracer m (TraceDecisionEvent Int BlockHeader) -> Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader)) -> Tracer m (TraceLabelPeer Int @@ -211,7 +215,7 @@ blockFetchExample1 decisionTracer clientStateTracer clientMsgTracer driverAsync <- async $ do threadId <- myThreadId labelThread threadId "block-fetch-driver" - driver blockHeap + downloadTimer -- Order of shutdown here is important for this example: must kill off the -- fetch thread before the peer threads. @@ -243,25 +247,25 @@ blockFetchExample1 decisionTracer clientStateTracer clientMsgTracer (sampleBlockFetchPolicy1 headerForgeUTCTime blockHeap currentChainHeaders candidateChainHeaders) registry (BlockFetchConfiguration { - bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, bfcMaxRequestsInflight = 10, - bfcDecisionLoopInterval = 0.01, - bfcSalt = 0 + bfcDecisionLoopIntervalBulkSync = 0.04, + bfcDecisionLoopIntervalDeadline = 0.01, + bfcSalt = 0, + bfcGenesisBFConfig = GenesisBlockFetchConfiguration + { gbfcBulkSyncGracePeriod = 10 -- seconds + } }) >> return () headerForgeUTCTime (FromConsensus x) = pure $ convertSlotToTimeForTestsAssumingNoHardFork (blockSlot x) - driver :: TestFetchedBlockHeap m Block -> m () - driver blockHeap = do - atomically $ do - heap <- getTestFetchedBlocks blockHeap - check $ - all (\c -> AnchoredFragment.headPoint c `Set.member` heap) - candidateChains - + -- | Terminates after 1 second per block in the candidate chains. + downloadTimer :: m () + downloadTimer = + let totalBlocks = sum $ map AF.length candidateChains + in threadDelay (fromIntegral totalBlocks) -- -- Sample block fetch configurations @@ -293,7 +297,10 @@ sampleBlockFetchPolicy1 headerFieldsForgeUTCTime blockHeap currentChain candidat blockMatchesHeader = \_ _ -> True, headerForgeUTCTime = headerFieldsForgeUTCTime, - blockForgeUTCTime = headerFieldsForgeUTCTime + blockForgeUTCTime = headerFieldsForgeUTCTime, + + readChainSelStarvation = pure (ChainSelStarvationEndedAt (Time 0)), + demoteCSJDynamo = \_ -> pure () } where plausibleCandidateChain cur candidate = diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/BlockFetch.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/BlockFetch.hs index 7a241ceecf8..3bab001318c 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/BlockFetch.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/BlockFetch.hs @@ -52,6 +52,7 @@ import Ouroboros.Network.NodeToNode.Version (isPipeliningEnabled) import Ouroboros.Network.Protocol.BlockFetch.Type (BlockFetch) import Ouroboros.Network.Testing.Utils +import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent) -- @@ -60,11 +61,11 @@ import Ouroboros.Network.Testing.Utils tests :: TestTree tests = testGroup "BlockFetch" - [ testProperty "static chains without overlap" - prop_blockFetchStaticNoOverlap + [ testProperty "BulkSync static chains without overlap" + prop_blockFetchBulkSyncStaticNoOverlap - , testProperty "static chains with overlap" - prop_blockFetchStaticWithOverlap + , testProperty "BulkSync static chains with overlap" + prop_blockFetchBulkSyncStaticWithOverlap , testCaseSteps "bracketSyncWithFetchClient" unit_bracketSyncWithFetchClient @@ -85,8 +86,8 @@ tests = testGroup "BlockFetch" -- | In this test we have two candidates chains that are static throughout the -- run. The two chains share some common prefix (genesis in the degenerate --- case). The test runs the block fetch logic to download all of both chain --- candidates. +-- case). The test runs the block fetch logic to download all blocks of the +-- longest candidate chain (either of them if they are of equal length). -- -- In this variant we set up the common prefix of the two candidates as the -- \"current\" chain. This means the block fetch only has to download the @@ -101,8 +102,8 @@ tests = testGroup "BlockFetch" -- * 'tracePropertyClientStateSanity' -- * 'tracePropertyInFlight' -- -prop_blockFetchStaticNoOverlap :: TestChainFork -> Property -prop_blockFetchStaticNoOverlap (TestChainFork common fork1 fork2) = +prop_blockFetchBulkSyncStaticNoOverlap :: TestChainFork -> Property +prop_blockFetchBulkSyncStaticNoOverlap (TestChainFork common fork1 fork2) = let trace = selectTraceEventsDynamic (runSimTrace simulation) in counterexample ("\nTrace:\n" ++ unlines (map show trace)) $ @@ -156,10 +157,10 @@ prop_blockFetchStaticNoOverlap (TestChainFork common fork1 fork2) = -- * 'tracePropertyClientStateSanity' -- * 'tracePropertyInFlight' -- --- TODO: 'prop_blockFetchStaticWithOverlap' fails if we introduce delays. issue #2622 +-- TODO: 'prop_blockFetchBulkSyncStaticWithOverlap' fails if we introduce delays. issue #2622 -- -prop_blockFetchStaticWithOverlap :: TestChainFork -> Property -prop_blockFetchStaticWithOverlap (TestChainFork _common fork1 fork2) = +prop_blockFetchBulkSyncStaticWithOverlap :: TestChainFork -> Property +prop_blockFetchBulkSyncStaticWithOverlap (TestChainFork _common fork1 fork2) = let trace = selectTraceEventsDynamic (runSimTrace simulation) in counterexample ("\nTrace:\n" ++ unlines (map show trace)) $ @@ -206,8 +207,7 @@ chainPoints = map (castPoint . blockPoint) . AnchoredFragment.toOldestFirst data Example1TraceEvent = - TraceFetchDecision [TraceLabelPeer Int - (FetchDecision [Point BlockHeader])] + TraceFetchDecision (TraceDecisionEvent Int BlockHeader) | TraceFetchClientState (TraceLabelPeer Int (TraceFetchClientState BlockHeader)) | TraceFetchClientSendRecv (TraceLabelPeer Int @@ -223,7 +223,8 @@ instance Show Example1TraceEvent where -- blocks in the 'FetchRequest's added by the decision logic and the blocks -- received by the fetch clients; check that the ordered sequence of blocks -- requested and completed by both fetch clients is exactly the sequence --- expected. The expected sequence is exactly the chain suffixes in order. +-- expected. The expected sequence is exactly the longest chain suffix, or +-- either of them if they are of equal length. -- -- This property is stronger than 'tracePropertyBlocksRequestedAndRecievedAllPeers' -- since it works with sequences rather than sets and for each chain @@ -240,15 +241,24 @@ tracePropertyBlocksRequestedAndRecievedPerPeer -> [Example1TraceEvent] -> Property tracePropertyBlocksRequestedAndRecievedPerPeer fork1 fork2 es = - requestedFetchPoints === requiredFetchPoints - .&&. receivedFetchPoints === requiredFetchPoints + counterexample "should request the expected blocks" + (disjoin $ map (requestedFetchPoints ===) requiredFetchPoints) + .&&. counterexample "should receive the expected blocks" + (disjoin $ map (receivedFetchPoints ===) requiredFetchPoints) where requiredFetchPoints = + if AnchoredFragment.length fork1 == AnchoredFragment.length fork2 + then [ requiredFetchPointsFor 1 fork1 + , requiredFetchPointsFor 2 fork2 + , Map.union (requiredFetchPointsFor 1 fork1) (requiredFetchPointsFor 2 fork2) + ] + else if AnchoredFragment.length fork1 < AnchoredFragment.length fork2 + then [requiredFetchPointsFor 2 fork2] + else [requiredFetchPointsFor 1 fork1] + + requiredFetchPointsFor peer fork = Map.filter (not . Prelude.null) $ - Map.fromList $ - [ (1, chainPoints fork1) - , (2, chainPoints fork2) - ] + Map.fromList [ (peer, chainPoints fork) ] requestedFetchPoints :: Map Int [Point BlockHeader] requestedFetchPoints = @@ -274,8 +284,8 @@ tracePropertyBlocksRequestedAndRecievedPerPeer fork1 fork2 es = -- blocks in the 'FetchRequest's added by the decision logic and the blocks -- received by the fetch clients; check that the set of all blocks requested -- across the two peers is the set of blocks we expect, and similarly for the --- set of all blocks received. The expected set of blocks is the union of the --- blocks on the two candidate chains. +-- set of all blocks received. The expected set of blocks is the block of the +-- longest candidate chain, or either of them if they have the same size. -- -- This property is weaker than 'tracePropertyBlocksRequestedAndRecievedPerPeer' -- since it does not involve order or frequency, but it holds for the general @@ -287,11 +297,23 @@ tracePropertyBlocksRequestedAndRecievedAllPeers -> [Example1TraceEvent] -> Property tracePropertyBlocksRequestedAndRecievedAllPeers fork1 fork2 es = - requestedFetchPoints === requiredFetchPoints - .&&. receivedFetchPoints === requiredFetchPoints + counterexample "should request the expected blocks" + (disjoin $ map (requestedFetchPoints ===) requiredFetchPoints) + .&&. counterexample "should receive the expected blocks" + (disjoin $ map (receivedFetchPoints ===) requiredFetchPoints) where requiredFetchPoints = - Set.fromList (chainPoints fork1 ++ chainPoints fork2) + if AnchoredFragment.length fork1 == AnchoredFragment.length fork2 + then [ requiredFetchPointsFor fork1 + , requiredFetchPointsFor fork2 + , Set.union (requiredFetchPointsFor fork1) (requiredFetchPointsFor fork2) + ] + else if AnchoredFragment.length fork1 < AnchoredFragment.length fork2 + then [requiredFetchPointsFor fork2] + else [requiredFetchPointsFor fork1] + + requiredFetchPointsFor fork = + Set.fromList $ chainPoints fork requestedFetchPoints :: Set (Point BlockHeader) requestedFetchPoints = @@ -504,7 +526,7 @@ tracePropertyInFlight = checkTrace Nothing reqsInFlight [] | reqsInFlight > 0 = counterexample - ("traceProeprtyInFlight: reqsInFlight = " ++ show reqsInFlight ++ " ≠ 0") + ("tracePropertyInFlight: reqsInFlight = " ++ show reqsInFlight ++ " ≠ 0") False | otherwise = property True diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs index 2b79d3d0e9c..b3cd70cf6bf 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node.hs @@ -38,7 +38,7 @@ import Control.Monad.Class.MonadSay import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadThrow (MonadEvaluate, MonadMask, MonadThrow, SomeException) -import Control.Monad.Class.MonadTime.SI (DiffTime, MonadTime) +import Control.Monad.Class.MonadTime.SI (DiffTime, MonadTime, Time (..)) import Control.Monad.Class.MonadTimer.SI (MonadDelay, MonadTimer) import Control.Monad.Fix (MonadFix) import Control.Tracer (Tracer (..), nullTracer) @@ -66,6 +66,7 @@ import Ouroboros.Network.AnchoredFragment qualified as AF import Ouroboros.Network.Block (MaxSlotNo (..), maxSlotNoFromWithOrigin, pointSlot) import Ouroboros.Network.BlockFetch +import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation(..)) import Ouroboros.Network.ConnectionManager.Types (DataFlow (..)) import Ouroboros.Network.Diffusion qualified as Diff import Ouroboros.Network.Diffusion.P2P qualified as Diff.P2P @@ -290,11 +291,14 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch = (blockFetchPolicy nodeKernel) (nkFetchClientRegistry nodeKernel) (BlockFetchConfiguration { - bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, bfcMaxRequestsInflight = 10, - bfcDecisionLoopInterval = 0.01, - bfcSalt = 0 + bfcDecisionLoopIntervalBulkSync = 0.04, + bfcDecisionLoopIntervalDeadline = 0.01, + bfcSalt = 0, + bfcGenesisBFConfig = GenesisBlockFetchConfiguration + { gbfcBulkSyncGracePeriod = 10 -- seconds + } }) blockFetchPolicy :: NodeKernel BlockHeader Block s m @@ -323,7 +327,10 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch = blockMatchesHeader = \_ _ -> True, headerForgeUTCTime, - blockForgeUTCTime = headerForgeUTCTime . fmap blockHeader + blockForgeUTCTime = headerForgeUTCTime . fmap blockHeader, + + readChainSelStarvation = pure (ChainSelStarvationEndedAt (Time 0)), + demoteCSJDynamo = \_ -> pure () } where plausibleCandidateChain cur candidate = diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch.hs index 86d0fecea8b..e852cb0b815 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} @@ -84,6 +85,7 @@ module Ouroboros.Network.BlockFetch ( blockFetchLogic , BlockFetchConfiguration (..) , BlockFetchConsensusInterface (..) + , GenesisBlockFetchConfiguration (..) -- ** Tracer types , FetchDecision , TraceFetchClientState (..) @@ -109,6 +111,8 @@ import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI import Control.Tracer (Tracer) +import GHC.Generics (Generic) + import Ouroboros.Network.Block import Ouroboros.Network.SizeInBytes (SizeInBytes) @@ -121,6 +125,7 @@ import Ouroboros.Network.BlockFetch.ConsensusInterface (BlockFetchConsensusInterface (..), FromConsensus (..), WhetherReceivingTentativeBlocks (..)) import Ouroboros.Network.BlockFetch.State +import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent) @@ -128,9 +133,6 @@ import Ouroboros.Network.BlockFetch.State -- Should be determined by external local node config. data BlockFetchConfiguration = BlockFetchConfiguration { - -- | Maximum concurrent downloads during bulk syncing. - bfcMaxConcurrencyBulkSync :: !Word, - -- | Maximum concurrent downloads during deadline syncing. bfcMaxConcurrencyDeadline :: !Word, @@ -138,13 +140,30 @@ data BlockFetchConfiguration = bfcMaxRequestsInflight :: !Word, -- | Desired interval between calls to fetchLogicIteration - bfcDecisionLoopInterval :: !DiffTime, + -- in BulkSync mode + bfcDecisionLoopIntervalBulkSync :: !DiffTime, + + -- | Desired interval between calls to fetchLogicIteration + -- in Deadline mode + bfcDecisionLoopIntervalDeadline :: !DiffTime, -- | Salt used when comparing peers - bfcSalt :: !Int + bfcSalt :: !Int, + + -- | Genesis-specific parameters + bfcGenesisBFConfig :: !GenesisBlockFetchConfiguration } deriving (Show) +-- | BlockFetch configuration parameters specific to Genesis. +data GenesisBlockFetchConfiguration = + GenesisBlockFetchConfiguration + { -- | Grace period when starting to talk to a peer in bulk sync mode + -- during which it is fine if the chain selection gets starved. + gbfcBulkSyncGracePeriod :: !DiffTime + } + deriving (Eq, Generic, Show) + -- | Execute the block fetch logic. It monitors the current chain and candidate -- chains. It decided which block bodies to fetch and manages the process of -- fetching them, including making alternative decisions based on timeouts and @@ -157,11 +176,11 @@ blockFetchLogic :: forall addr header block m. , HasHeader block , HeaderHash header ~ HeaderHash block , MonadDelay m - , MonadSTM m + , MonadTimer m , Ord addr , Hashable addr ) - => Tracer m [TraceLabelPeer addr (FetchDecision [Point header])] + => Tracer m (TraceDecisionEvent addr header) -> Tracer m (TraceLabelPeer addr (TraceFetchClientState header)) -> BlockFetchConsensusInterface addr header block m -> FetchClientRegistry addr header block m @@ -179,6 +198,7 @@ blockFetchLogic decisionTracer clientStateTracer fetchDecisionPolicy fetchTriggerVariables fetchNonTriggerVariables + demoteCSJDynamo where mkFetchClientPolicy :: WhetherReceivingTentativeBlocks -> STM m (FetchClientPolicy header block m) mkFetchClientPolicy receivingTentativeBlocks = do @@ -193,11 +213,12 @@ blockFetchLogic decisionTracer clientStateTracer fetchDecisionPolicy :: FetchDecisionPolicy header fetchDecisionPolicy = FetchDecisionPolicy { - maxInFlightReqsPerPeer = bfcMaxRequestsInflight, - maxConcurrencyBulkSync = bfcMaxConcurrencyBulkSync, - maxConcurrencyDeadline = bfcMaxConcurrencyDeadline, - decisionLoopInterval = bfcDecisionLoopInterval, - peerSalt = bfcSalt, + maxInFlightReqsPerPeer = bfcMaxRequestsInflight, + maxConcurrencyDeadline = bfcMaxConcurrencyDeadline, + decisionLoopIntervalBulkSync = bfcDecisionLoopIntervalBulkSync, + decisionLoopIntervalDeadline = bfcDecisionLoopIntervalDeadline, + peerSalt = bfcSalt, + bulkSyncGracePeriod = gbfcBulkSyncGracePeriod bfcGenesisBFConfig, plausibleCandidateChain, compareCandidateChains, @@ -219,5 +240,6 @@ blockFetchLogic decisionTracer clientStateTracer readStatePeerStateVars = readFetchClientsStateVars registry, readStatePeerGSVs = readPeerGSVs registry, readStateFetchMode = readFetchMode, - readStateFetchedMaxSlotNo = readFetchedMaxSlotNo + readStateFetchedMaxSlotNo = readFetchedMaxSlotNo, + readStateChainSelStarvation = readChainSelStarvation } diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs index ca47dd93e0d..38b4d662a0e 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs @@ -26,7 +26,7 @@ import Control.Monad (unless) import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI -import Data.Set qualified as Set +import qualified Data.Set as Set import Control.Tracer (traceWith) diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientState.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientState.hs index 8e25804f662..9656ace0887 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientState.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientState.hs @@ -32,13 +32,16 @@ module Ouroboros.Network.BlockFetch.ClientState -- * Ancillary , FromConsensus (..) , WhetherReceivingTentativeBlocks (..) + , PeersOrder(..) + , mcons + , msnoc ) where import Data.List (foldl') import Data.Maybe (mapMaybe) import Data.Semigroup (Last (..)) +import Data.Sequence (Seq, (|>), (<|)) import Data.Set (Set) -import Data.Set qualified as Set import Control.Concurrent.Class.MonadSTM.Strict import Control.Exception (assert) @@ -59,6 +62,7 @@ import Ouroboros.Network.ControlMessage (ControlMessageSTM, timeoutWithControlMessage) import Ouroboros.Network.Point (withOriginToMaybe) import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..)) +import qualified Data.Set as Set -- | The context that is passed into the block fetch protocol client when it -- is started. @@ -785,3 +789,23 @@ tryReadTMergeVar :: MonadSTM m => TMergeVar m a -> STM m (Maybe a) tryReadTMergeVar (TMergeVar v) = tryReadTMVar v + +-- | The order of peers for bulk sync fetch decisions. +data PeersOrder peer = PeersOrder + { peersOrderCurrent :: Maybe peer + -- ^ The current peer we are fetching from, if there is one. + , peersOrderAll :: Seq peer + -- ^ All the peers, from most preferred to least preferred. + -- + -- INVARIANT: If there is a current peer, it is always the head of this list. + , peersOrderStart :: Time + -- ^ The time at which we started talking to the current peer. + } + +mcons :: Maybe a -> Seq a -> Seq a +mcons Nothing xs = xs +mcons (Just x) xs = x <| xs + +msnoc :: Seq a -> Maybe a -> Seq a +msnoc xs Nothing = xs +msnoc xs (Just x) = xs |> x diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs index de652e1f53e..287b33969b4 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs @@ -16,1136 +16,83 @@ module Ouroboros.Network.BlockFetch.Decision -- ** Components of the decision-making process , filterPlausibleCandidates , selectForkSuffixes - , filterNotAlreadyFetched - , filterNotAlreadyInFlightWithPeer + , dropAlreadyFetched + , dropAlreadyInFlightWithPeer , prioritisePeerChains - , filterNotAlreadyInFlightWithOtherPeers , fetchRequestDecisions ) where -import Data.Set qualified as Set - -import Data.Function (on) import Data.Hashable -import Data.List (foldl', groupBy, sortBy, transpose) -import Data.Maybe (mapMaybe) -import Data.Set (Set) -import GHC.Stack (HasCallStack) - -import Control.Exception (assert) -import Control.Monad (guard) -import Control.Monad.Class.MonadTime.SI (DiffTime) +import Control.Monad.Class.MonadTime.SI (MonadMonotonicTime(..)) +import Control.Tracer (Tracer) -import Ouroboros.Network.AnchoredFragment (AnchoredFragment, AnchoredSeq (..)) -import Ouroboros.Network.AnchoredFragment qualified as AF +import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import Ouroboros.Network.Block -import Ouroboros.Network.Point (withOriginToMaybe) - -import Ouroboros.Network.BlockFetch.ClientState (FetchRequest (..), - PeerFetchInFlight (..), PeerFetchStatus (..)) -import Ouroboros.Network.BlockFetch.ConsensusInterface (FetchMode (..)) -import Ouroboros.Network.BlockFetch.DeltaQ (PeerFetchInFlightLimits (..), - PeerGSV (..), SizeInBytes, calculatePeerFetchInFlightLimits, - comparePeerGSV, comparePeerGSV', estimateExpectedResponseDuration, - estimateResponseDeadlineProbability) - - -data FetchDecisionPolicy header = FetchDecisionPolicy { - maxInFlightReqsPerPeer :: Word, -- A protocol constant. - - maxConcurrencyBulkSync :: Word, - maxConcurrencyDeadline :: Word, - decisionLoopInterval :: DiffTime, - peerSalt :: Int, - - plausibleCandidateChain :: HasCallStack - => AnchoredFragment header - -> AnchoredFragment header -> Bool, - - compareCandidateChains :: HasCallStack - => AnchoredFragment header - -> AnchoredFragment header - -> Ordering, - - blockFetchSize :: header -> SizeInBytes - } - - -type PeerInfo header peer extra = - ( PeerFetchStatus header, - PeerFetchInFlight header, - PeerGSV, - peer, - extra - ) - --- | Throughout the decision making process we accumulate reasons to decline --- to fetch any blocks. This type is used to wrap intermediate and final --- results. --- -type FetchDecision result = Either FetchDecline result - --- | All the various reasons we can decide not to fetch blocks from a peer. --- --- It is worth highlighting which of these reasons result from competition --- among upstream peers. --- --- * 'FetchDeclineInFlightOtherPeer': decline this peer because all the --- unfetched blocks of its candidate chain have already been requested from --- other peers. This reason reflects the least-consequential competition --- among peers: the competition that determines merely which upstream peer to --- burden with the request (eg the one with the best --- 'Ouroboros.Network.BlockFetch.DeltaQ.DeltaQ' metrics). The consequences --- are relatively minor because the unfetched blocks on this peer's candidate --- chain will be requested regardless; it's merely a question of "From who?". --- (One exception: if an adversarial peer wins this competition such that the --- blocks are only requested from them, then it may be possible that this --- decision determines whether the blocks are ever /received/. But that --- depends on details of timeouts, a longer competing chain being soon --- received within those timeouts, and so on.) --- --- * 'FetchDeclineChainNotPlausible': decline this peer because the node has --- already fetched, validated, and selected a chain better than its candidate --- chain from other peers (or from the node's own block forge). Because the --- node's current selection is influenced by what blocks other peers have --- recently served (or it recently minted), this reason reflects that peers --- /indirectly/ compete by serving as long of a chain as possible and as --- promptly as possible. When the tips of the peers' selections are all --- within their respective forecast horizons (see --- 'Ouroboros.Consensus.Ledger.SupportsProtocol.ledgerViewForecastAt'), then --- the length of their candidate chains will typically be the length of their --- selections, since the ChainSync is free to race ahead (in contrast, the --- BlockFetch pipeline depth is bounded such that it will, for a syncing --- node, not be able to request all blocks between the selection and the end --- of the forecast window). But if one or more of their tips is beyond the --- horizon, then the relative length of the candidate chains is more --- complicated, influenced by both the relative density of the chains' --- suffixes and the relative age of the chains' intersection with the node's --- selection (since each peer's forecast horizon is a fixed number of slots --- after the candidate's successor of that intersection). --- --- * 'FetchDeclineConcurrencyLimit': decline this peer while the node has --- already fully allocated the artificially scarce 'maxConcurrentFetchPeers' --- resource amongst its other peers. This reason reflects the --- least-fundamental competition: it's the only way a node would decline a --- candidate chain C that it would immediately switch to if C had somehow --- already been fetched (and any better current candidates hadn't). It is --- possible that this peer's candidate fragment is better than the candidate --- fragments of other peers, but that should only happen ephemerally (eg for --- a brief while immediately after first connecting to this peer). --- --- * 'FetchDeclineChainIntersectionTooDeep': decline this peer because the node's --- selection has more than @K@ blocks that are not on this peer's candidate --- chain. Typically, this reason occurs after the node has been declined---ie --- lost the above competitions---for a long enough duration. This decision --- only arises if the BlockFetch decision logic wins a harmless race against --- the ChainSync client once the node's selection gets longer, since --- 'Ouroboros.Consensus.MiniProtocol.ChainSync.Client.ForkTooDeep' --- disconnects from such a peer. --- -data FetchDecline = - -- | This peer's candidate chain is not longer than our chain. For more - -- details see - -- 'Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface.mkBlockFetchConsensusInterface' - -- which implements 'plausibleCandidateChain'. - -- - FetchDeclineChainNotPlausible - - -- | Switching to this peer's candidate chain would require rolling back - -- more than @K@ blocks. - -- - | FetchDeclineChainIntersectionTooDeep - - -- | Every block on this peer's candidate chain has already been fetched. - -- - | FetchDeclineAlreadyFetched - - -- | This peer's candidate chain has already been requested from this - -- peer. - -- - | FetchDeclineInFlightThisPeer - - -- | Some blocks on this peer's candidate chain have not yet been fetched, - -- but all of those have already been requested from other peers. - -- - | FetchDeclineInFlightOtherPeer - - -- | This peer's BlockFetch client is shutting down, see - -- 'PeerFetchStatusShutdown'. - -- - | FetchDeclinePeerShutdown - - -- | Blockfetch is starting up and waiting on corresponding Chainsync. - | FetchDeclinePeerStarting - - - -- The reasons above this comment are fundamental and/or obvious. On the - -- other hand, the reasons below are heuristic. - - - -- | This peer is in a potentially-temporary state in which it has not - -- responded to us within a certain expected time limit, see - -- 'PeerFetchStatusAberrant'. - -- - | FetchDeclinePeerSlow - - -- | This peer is not under the 'maxInFlightReqsPerPeer' limit. - -- - -- The argument is the 'maxInFlightReqsPerPeer' constant. - -- - | FetchDeclineReqsInFlightLimit !Word - - -- | This peer is not under the 'inFlightBytesHighWatermark' bytes limit. - -- - -- The arguments are: - -- - -- * number of bytes currently in flight for that peer - -- * the configured 'inFlightBytesLowWatermark' constant - -- * the configured 'inFlightBytesHighWatermark' constant - -- - | FetchDeclineBytesInFlightLimit !SizeInBytes !SizeInBytes !SizeInBytes - - -- | This peer is not under the 'inFlightBytesLowWatermark'. - -- - -- The arguments are: - -- - -- * number of bytes currently in flight for that peer - -- * the configured 'inFlightBytesLowWatermark' constant - -- * the configured 'inFlightBytesHighWatermark' constant - -- - | FetchDeclinePeerBusy !SizeInBytes !SizeInBytes !SizeInBytes - - -- | The node is not under the 'maxConcurrentFetchPeers' limit. - -- - -- The arguments are: - -- - -- * the current 'FetchMode' - -- * the corresponding configured limit constant, either - -- 'maxConcurrencyBulkSync' or 'maxConcurrencyDeadline' - -- - | FetchDeclineConcurrencyLimit !FetchMode !Word - deriving (Eq, Show) - - --- | The \"oh noes?!\" operator. --- --- In the case of an error, the operator provides a specific error value. --- -(?!) :: Maybe a -> e -> Either e a -Just x ?! _ = Right x -Nothing ?! e = Left e - --- | The combination of a 'ChainSuffix' and a list of discontiguous --- 'AnchoredFragment's: --- --- * When comparing two 'CandidateFragments' as candidate chains, we use the --- 'ChainSuffix'. --- --- * To track which blocks of that candidate still have to be downloaded, we --- use a list of discontiguous 'AnchoredFragment's. --- -type CandidateFragments header = (ChainSuffix header, [AnchoredFragment header]) +import Ouroboros.Network.BlockFetch.ClientState (FetchRequest (..), PeersOrder (..)) +import Ouroboros.Network.BlockFetch.ConsensusInterface (FetchMode (..), ChainSelStarvation) +import Ouroboros.Network.BlockFetch.Decision.Deadline (FetchDecisionPolicy (..), PeerInfo, FetchDecision, FetchDecline (..), + filterPlausibleCandidates, dropAlreadyFetched, dropAlreadyInFlightWithPeer, + selectForkSuffixes, fetchDecisionsDeadline, prioritisePeerChains, fetchRequestDecisions) +import Ouroboros.Network.BlockFetch.Decision.BulkSync (fetchDecisionsBulkSyncM) +import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent) fetchDecisions - :: (Ord peer, + :: forall peer header block m extra. + (Ord peer, Hashable peer, HasHeader header, - HeaderHash header ~ HeaderHash block) - => FetchDecisionPolicy header + HeaderHash header ~ HeaderHash block, MonadMonotonicTime m) + => Tracer m (TraceDecisionEvent peer header) + -> FetchDecisionPolicy header -> FetchMode -> AnchoredFragment header -> (Point block -> Bool) -> MaxSlotNo + -> ChainSelStarvation + -> ( PeersOrder peer + , PeersOrder peer -> m () + , peer -> m () + ) -> [(AnchoredFragment header, PeerInfo header peer extra)] - -> [(FetchDecision (FetchRequest header), PeerInfo header peer extra)] -fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy { - plausibleCandidateChain, - compareCandidateChains, - blockFetchSize, - peerSalt - } - fetchMode - currentChain - fetchedBlocks - fetchedMaxSlotNo = - - -- Finally, make a decision for each (chain, peer) pair. - fetchRequestDecisions - fetchDecisionPolicy - fetchMode - . map swizzleSIG - - -- Filter to keep blocks that are not already in-flight with other peers. - . filterNotAlreadyInFlightWithOtherPeers - fetchMode - . map swizzleSI - - -- Reorder chains based on consensus policy and network timing data. - . prioritisePeerChains - fetchMode - peerSalt - compareCandidateChains - blockFetchSize - . map swizzleIG - - -- Filter to keep blocks that are not already in-flight for this peer. - . filterNotAlreadyInFlightWithPeer - . map swizzleI - - -- Filter to keep blocks that have not already been downloaded. - . filterNotAlreadyFetched - fetchedBlocks - fetchedMaxSlotNo - - -- Select the suffix up to the intersection with the current chain. - . selectForkSuffixes - currentChain - - -- First, filter to keep chains the consensus layer tells us are plausible. - . filterPlausibleCandidates - plausibleCandidateChain - currentChain - where - -- Data swizzling functions to get the right info into each stage. - swizzleI (c, p@(_, inflight,_,_, _)) = (c, inflight, p) - swizzleIG (c, p@(_, inflight,gsvs,peer,_)) = (c, inflight, gsvs, peer, p) - swizzleSI (c, p@(status,inflight,_,_, _)) = (c, status, inflight, p) - swizzleSIG (c, p@(status,inflight,gsvs,peer,_)) = (c, status, inflight, gsvs, peer, p) - -{- -We have the node's /current/ or /adopted/ chain. This is the node's chain in -the sense specified by the Ouroboros algorithm. It is a fully verified chain -with block bodies and a ledger state. - - ┆ ┆ - ├───┤ - │ │ - ├───┤ - │ │ - ├───┤ - │ │ - ├───┤ - │ │ - ───┴───┴─── current chain length (block number) - -With chain selection we are interested in /candidate/ chains. We have these -candidate chains in the form of chains of verified headers, but without bodies. - -The consensus layer gives us the current set of candidate chains from our peers -and we have the task of selecting which block bodies to download, and then -passing those block bodes back to the consensus layer. The consensus layer will -try to validate them and decide if it wants to update its current chain. - - ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ - ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ - ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ - ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ - ├───┤ ├───┤ ├───┤ ├───┤ └───┘ - │ │ │ │ │ │ │ │ - ───┴───┴─────┼───┼─────┼───┼─────┼───┼───────────── current chain length - │ │ │ │ │ │ - current ├───┤ ├───┤ └───┘ - (blocks) │ │ │ │ - └───┘ └───┘ - A B C D - candidates - (headers) - -In this example we have four candidate chains, with all but chain D strictly -longer than our current chain. - -In general there are many candidate chains. We make a distinction between a -candidate chain and the peer from which it is available. It is often the -case that the same chain is available from multiple peers. We will try to be -clear about when we are referring to chains or the combination of a chain and -the peer from which it is available. - -For the sake of the example let us assume we have the four chains above -available from the following peers. - -peer 1 2 3 4 5 6 7 - ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ - ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ │ │ │ │ - ├───┤ ├───┤ ├───┤ ├───┤ └───┘ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ │ │ - ──┼───┼─────┼───┼─────┼───┼─────┼───┼───────────────┼───┼─────┼───┼── - │ │ │ │ │ │ │ │ │ │ │ │ - └───┘ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ - └───┘ └───┘ └───┘ └───┘ └───┘ -chain C A B A D B A - -This is the form in which we are informed about candidate chains from the -consensus layer, the combination of a chain and the peer it is from. This -makes sense, since these things change independently. - -We will process the chains in this form, keeping the peer/chain combination all -the way through. Although there could in principle be some opportunistic saving -by sharing when multiple peers provide the same chain, taking advantage of this -adds complexity and does nothing to improve our worst case costs. - -We are only interested in candidate chains that are strictly longer than our -current chain. So our first task is to filter down to this set. --} - - --- | Keep only those candidate chains that are preferred over the current --- chain. Typically, this means that their length is longer than the length of --- the current chain. --- -filterPlausibleCandidates - :: (AnchoredFragment block -> AnchoredFragment header -> Bool) - -> AnchoredFragment block -- ^ The current chain - -> [(AnchoredFragment header, peerinfo)] - -> [(FetchDecision (AnchoredFragment header), peerinfo)] -filterPlausibleCandidates plausibleCandidateChain currentChain chains = - [ (chain', peer) - | (chain, peer) <- chains - , let chain' = do - guard (plausibleCandidateChain currentChain chain) - ?! FetchDeclineChainNotPlausible - return chain - ] - - -{- -In the example, this leaves us with only the candidate chains: A, B and C, but -still paired up with the various peers. - - -peer 1 2 3 4 6 7 - ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ - ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ │ │ - ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ │ │ - ──┼───┼─────┼───┼─────┼───┼─────┼───┼───────────────┼───┼─────┼───┼── - │ │ │ │ │ │ │ │ │ │ │ │ - └───┘ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ - │ │ │ │ │ │ │ │ │ │ - └───┘ └───┘ └───┘ └───┘ └───┘ -chain C A B A B A --} - - -{- -Of course we would at most need to download the blocks in a candidate chain -that are not already in the current chain. So we must find those intersections. - -Before we do that, lets define how we represent a suffix of a chain. We do this -very simply as a chain fragment: exactly those blocks contained in the suffix. -A chain fragment is of course not a chain, but has many similar invariants. - -We will later also need to represent chain ranges when we send block fetch -requests. We do this using a pair of points: the first and last blocks in the -range. While we can represent an empty chain fragment, we cannot represent an -empty fetch range, but this is ok since we never request empty ranges. - - Chain fragment - ┌───┐ - │ ◉ │ Start of range, inclusive - ├───┤ - │ │ - ├───┤ - │ │ - ├───┤ - │ │ - ├───┤ - │ ◉ │ End of range, inclusive. - └───┘ --} - --- | A chain suffix, obtained by intersecting a candidate chain with the --- current chain. --- --- The anchor point of a 'ChainSuffix' will be a point within the bounds of --- the current chain ('AF.withinFragmentBounds'), indicating that it forks off --- in the last @K@ blocks. --- --- A 'ChainSuffix' must be non-empty, as an empty suffix, i.e. the candidate --- chain is equal to the current chain, would not be a plausible candidate. -newtype ChainSuffix header = - ChainSuffix { getChainSuffix :: AnchoredFragment header } - -{- -We define the /chain suffix/ as the suffix of the candidate chain up until (but -not including) where it intersects the current chain. - - - current peer 1 peer 2 - - ┆ ┆ - ├───┤ - │ ◀┿━━━━━━━━━━━━━━━━━┓ - ├───┤ ┌─╂─┐ - │ │ │ ◉ │ - ├───┤ ├───┤ - │ │ │ │ - ├───┤ ├───┤ - │ ◀┿━━━━━━━┓ │ │ - ───┴───┴─────┬─╂─┬─────┼───┼─── - │ ◉ │ │ │ - └───┘ ├───┤ - │ ◉ │ - └───┘ - C A - -In this example we found that C was a strict extension of the current chain -and chain A was a short fork. - -Note that it's possible that we don't find any intersection within the last K -blocks. This means the candidate forks by more than K and so we are not -interested in this candidate at all. --} - --- | Find the chain suffix for a candidate chain, with respect to the --- current chain. --- -chainForkSuffix - :: (HasHeader header, HasHeader block, - HeaderHash header ~ HeaderHash block) - => AnchoredFragment block -- ^ Current chain. - -> AnchoredFragment header -- ^ Candidate chain - -> Maybe (ChainSuffix header) -chainForkSuffix current candidate = - case AF.intersect current candidate of - Nothing -> Nothing - Just (_, _, _, candidateSuffix) -> - -- If the suffix is empty, it means the candidate chain was equal to - -- the current chain and didn't fork off. Such a candidate chain is - -- not a plausible candidate, so it must have been filtered out. - assert (not (AF.null candidateSuffix)) $ - Just (ChainSuffix candidateSuffix) - -selectForkSuffixes - :: (HasHeader header, HasHeader block, - HeaderHash header ~ HeaderHash block) - => AnchoredFragment block - -> [(FetchDecision (AnchoredFragment header), peerinfo)] - -> [(FetchDecision (ChainSuffix header), peerinfo)] -selectForkSuffixes current chains = - [ (mchain', peer) - | (mchain, peer) <- chains - , let mchain' = do - chain <- mchain - chainForkSuffix current chain ?! FetchDeclineChainIntersectionTooDeep - ] - -{- -We define the /fetch range/ as the suffix of the fork range that has not yet -had its blocks downloaded and block content checked against the headers. + -> m [(FetchDecision (FetchRequest header), PeerInfo header peer extra)] - ┆ ┆ - ├───┤ - │ │ - ├───┤ ┌───┐ - │ │ already │ │ - ├───┤ fetched ├───┤ - │ │ blocks │ │ - ├───┤ ├───┤ - │ │ │░◉░│ ◄ fetch range - ───┴───┴─────┬───┬─────┼───┼─── - │░◉░│ ◄ │░░░│ - └───┘ ├───┤ - │░◉░│ ◄ - └───┘ - -In earlier versions of this scheme we maintained and relied on the invariant -that the ranges of fetched blocks are backwards closed. This meant we never had -discontinuous ranges of fetched or not-yet-fetched blocks. This invariant does -simplify things somewhat by keeping the ranges continuous however it precludes -fetching ranges of blocks from different peers in parallel. - -We do not maintain any such invariant and so we have to deal with there being -gaps in the ranges we have already fetched or are yet to fetch. To keep the -tracking simple we do not track the ranges themselves, rather we track the set -of individual blocks without their relationship to each other. - --} - --- | Find the fragments of the chain suffix that we still need to fetch, these --- are the fragments covering blocks that have not yet been fetched and are --- not currently in the process of being fetched from this peer. --- --- Typically this is a single fragment forming a suffix of the chain, but in --- the general case we can get a bunch of discontiguous chain fragments. --- -filterNotAlreadyFetched - :: (HasHeader header, HeaderHash header ~ HeaderHash block) - => (Point block -> Bool) - -> MaxSlotNo - -> [(FetchDecision (ChainSuffix header), peerinfo)] - -> [(FetchDecision (CandidateFragments header), peerinfo)] -filterNotAlreadyFetched alreadyDownloaded fetchedMaxSlotNo chains = - [ (mcandidates, peer) - | (mcandidate, peer) <- chains - , let mcandidates = do - candidate <- mcandidate - let fragments = filterWithMaxSlotNo - notAlreadyFetched - fetchedMaxSlotNo - (getChainSuffix candidate) - guard (not (null fragments)) ?! FetchDeclineAlreadyFetched - return (candidate, fragments) - ] - where - notAlreadyFetched = not . alreadyDownloaded . castPoint . blockPoint - - -filterNotAlreadyInFlightWithPeer - :: HasHeader header - => [(FetchDecision (CandidateFragments header), PeerFetchInFlight header, - peerinfo)] - -> [(FetchDecision (CandidateFragments header), peerinfo)] -filterNotAlreadyInFlightWithPeer chains = - [ (mcandidatefragments', peer) - | (mcandidatefragments, inflight, peer) <- chains - , let mcandidatefragments' = do - (candidate, chainfragments) <- mcandidatefragments - let fragments = concatMap (filterWithMaxSlotNo - (notAlreadyInFlight inflight) - (peerFetchMaxSlotNo inflight)) - chainfragments - guard (not (null fragments)) ?! FetchDeclineInFlightThisPeer - return (candidate, fragments) - ] - where - notAlreadyInFlight inflight b = - blockPoint b `Set.notMember` peerFetchBlocksInFlight inflight - - --- | A penultimate step of filtering, but this time across peers, rather than --- individually for each peer. If we're following the parallel fetch --- mode then we filter out blocks that are already in-flight with other --- peers. --- --- Note that this does /not/ cover blocks that are proposed to be fetched in --- this round of decisions. That step is covered in 'fetchRequestDecisions'. --- -filterNotAlreadyInFlightWithOtherPeers - :: HasHeader header - => FetchMode - -> [( FetchDecision [AnchoredFragment header] - , PeerFetchStatus header - , PeerFetchInFlight header - , peerinfo )] - -> [(FetchDecision [AnchoredFragment header], peerinfo)] - -filterNotAlreadyInFlightWithOtherPeers FetchModeDeadline chains = - [ (mchainfragments, peer) - | (mchainfragments, _, _, peer) <- chains ] - -filterNotAlreadyInFlightWithOtherPeers FetchModeBulkSync chains = - [ (mcandidatefragments', peer) - | (mcandidatefragments, _, _, peer) <- chains - , let mcandidatefragments' = do - chainfragments <- mcandidatefragments - let fragments = concatMap (filterWithMaxSlotNo - notAlreadyInFlight - maxSlotNoInFlightWithOtherPeers) - chainfragments - guard (not (null fragments)) ?! FetchDeclineInFlightOtherPeer - return fragments - ] - where - notAlreadyInFlight b = - blockPoint b `Set.notMember` blocksInFlightWithOtherPeers - - -- All the blocks that are already in-flight with all peers - blocksInFlightWithOtherPeers = - Set.unions - [ case status of - PeerFetchStatusShutdown -> Set.empty - PeerFetchStatusStarting -> Set.empty - PeerFetchStatusAberrant -> Set.empty - _other -> peerFetchBlocksInFlight inflight - | (_, status, inflight, _) <- chains ] - - -- The highest slot number that is or has been in flight for any peer. - maxSlotNoInFlightWithOtherPeers = foldl' max NoMaxSlotNo - [ peerFetchMaxSlotNo inflight | (_, _, inflight, _) <- chains ] - --- | Filter a fragment. This is an optimised variant that will behave the same --- as 'AnchoredFragment.filter' if the following precondition is satisfied: --- --- PRECONDITION: for all @hdr@ in the chain fragment: if @blockSlot hdr > --- maxSlotNo@ then the predicate should not hold for any header after @hdr@ in --- the chain fragment. --- --- For example, when filtering out already downloaded blocks from the --- fragment, it does not make sense to keep filtering after having encountered --- the highest slot number the ChainDB has seen so far: blocks with a greater --- slot number cannot have been downloaded yet. When the candidate fragments --- get far ahead of the current chain, e.g., @2k@ headers, this optimisation --- avoids the linear cost of filtering these headers when we know in advance --- they will all remain in the final fragment. In case the given slot number --- is 'NoSlotNo', no filtering takes place, as there should be no matches --- because we haven't downloaded any blocks yet. --- --- For example, when filtering out blocks already in-flight for the given --- peer, the given @maxSlotNo@ can correspond to the block with the highest --- slot number that so far has been in-flight for the given peer. When no --- blocks have been in-flight yet, @maxSlotNo@ can be 'NoSlotNo', in which --- case no filtering needs to take place, which makes sense, as there are no --- blocks to filter out. Note that this is conservative: if a block is for --- some reason multiple times in-flight (maybe it has to be redownloaded) and --- the block's slot number matches the @maxSlotNo@, it will now be filtered --- (while the filtering might previously have stopped before encountering the --- block in question). This is fine, as the filter will now include the block, --- because according to the filtering predicate, the block is not in-flight. -filterWithMaxSlotNo - :: forall header. HasHeader header - => (header -> Bool) - -> MaxSlotNo -- ^ @maxSlotNo@ - -> AnchoredFragment header - -> [AnchoredFragment header] -filterWithMaxSlotNo p maxSlotNo = - AF.filterWithStop p ((> maxSlotNo) . MaxSlotNo . blockSlot) - -prioritisePeerChains - :: forall extra header peer. - ( HasHeader header - , Hashable peer - , Ord peer - ) - => FetchMode - -> Int - -> (AnchoredFragment header -> AnchoredFragment header -> Ordering) - -> (header -> SizeInBytes) - -> [(FetchDecision (CandidateFragments header), PeerFetchInFlight header, - PeerGSV, - peer, - extra )] - -> [(FetchDecision [AnchoredFragment header], extra)] -prioritisePeerChains FetchModeDeadline salt compareCandidateChains blockFetchSize = - map (\(decision, peer) -> - (fmap (\(_,_,fragment) -> fragment) decision, peer)) - . concatMap ( concat - . transpose - . groupBy (equatingFst - (equatingRight - ((==) `on` chainHeadPoint))) - . sortBy (comparingFst - (comparingRight - (compare `on` chainHeadPoint))) - ) - . groupBy (equatingFst - (equatingRight - (equatingPair - -- compare on probability band first, then preferred chain - (==) - (equateCandidateChains `on` getChainSuffix) - `on` - (\(band, chain, _fragments) -> (band, chain))))) - . sortBy (descendingOrder - (comparingFst - (comparingRight - (comparingPair - -- compare on probability band first, then preferred chain - compare - (compareCandidateChains `on` getChainSuffix) - `on` - (\(band, chain, _fragments) -> (band, chain)))))) - . map annotateProbabilityBand - . sortBy (\(_,_,a,ap,_) (_,_,b,bp,_) -> - comparePeerGSV' salt (a,ap) (b,bp)) - where - annotateProbabilityBand (Left decline, _, _, _, peer) = (Left decline, peer) - annotateProbabilityBand (Right (chain,fragments), inflight, gsvs, _, peer) = - (Right (band, chain, fragments), peer) - where - band = probabilityBand $ - estimateResponseDeadlineProbability - gsvs - (peerFetchBytesInFlight inflight) - (totalFetchSize blockFetchSize fragments) - deadline - - deadline = 2 -- seconds -- TODO: get this from external info - - equateCandidateChains chain1 chain2 - | EQ <- compareCandidateChains chain1 chain2 = True - | otherwise = False - - chainHeadPoint (_,ChainSuffix c,_) = AF.headPoint c - -prioritisePeerChains FetchModeBulkSync salt compareCandidateChains blockFetchSize = - map (\(decision, peer) -> - (fmap (\(_, _, fragment) -> fragment) decision, peer)) - . sortBy (comparingFst - (comparingRight - (comparingPair - -- compare on preferred chain first, then duration - (compareCandidateChains `on` getChainSuffix) - compare - `on` - (\(duration, chain, _fragments) -> (chain, duration))))) - . map annotateDuration - . sortBy (\(_,_,a,ap,_) (_,_,b,bp,_) -> - comparePeerGSV' salt (a,ap) (b,bp)) - where - annotateDuration (Left decline, _, _, _, peer) = (Left decline, peer) - annotateDuration (Right (chain,fragments), inflight, gsvs, _, peer) = - (Right (duration, chain, fragments), peer) - where - -- TODO: consider if we should put this into bands rather than just - -- taking the full value. - duration = estimateExpectedResponseDuration - gsvs - (peerFetchBytesInFlight inflight) - (totalFetchSize blockFetchSize fragments) - -totalFetchSize :: (header -> SizeInBytes) - -> [AnchoredFragment header] - -> SizeInBytes -totalFetchSize blockFetchSize fragments = - sum [ blockFetchSize header - | fragment <- fragments - , header <- AF.toOldestFirst fragment ] - -type Comparing a = a -> a -> Ordering -type Equating a = a -> a -> Bool - -descendingOrder :: Comparing a -> Comparing a -descendingOrder cmp = flip cmp - -comparingPair :: Comparing a -> Comparing b -> Comparing (a, b) -comparingPair cmpA cmpB (a1, b1) (a2, b2) = cmpA a1 a2 <> cmpB b1 b2 - -equatingPair :: Equating a -> Equating b -> Equating (a, b) -equatingPair eqA eqB (a1, b1) (a2, b2) = eqA a1 a2 && eqB b1 b2 - -comparingEither :: Comparing a -> Comparing b -> Comparing (Either a b) -comparingEither _ _ (Left _) (Right _) = LT -comparingEither cmpA _ (Left x) (Left y) = cmpA x y -comparingEither _ cmpB (Right x) (Right y) = cmpB x y -comparingEither _ _ (Right _) (Left _) = GT - -equatingEither :: Equating a -> Equating b -> Equating (Either a b) -equatingEither _ _ (Left _) (Right _) = False -equatingEither eqA _ (Left x) (Left y) = eqA x y -equatingEither _ eqB (Right x) (Right y) = eqB x y -equatingEither _ _ (Right _) (Left _) = False - -comparingFst :: Comparing a -> Comparing (a, b) -comparingFst cmp = cmp `on` fst - -equatingFst :: Equating a -> Equating (a, b) -equatingFst eq = eq `on` fst - -comparingRight :: Comparing b -> Comparing (Either a b) -comparingRight = comparingEither mempty - -equatingRight :: Equating b -> Equating (Either a b) -equatingRight = equatingEither (\_ _ -> True) - --- | Given the probability of the download completing within the deadline, --- classify that into one of three broad bands: high, medium and low. --- --- The bands are --- --- * high: 98% -- 100% --- * medium: 75% -- 98% --- * low: 0% -- 75% --- -probabilityBand :: Double -> ProbabilityBand -probabilityBand p - | p > 0.98 = ProbabilityHigh - | p > 0.75 = ProbabilityModerate - | otherwise = ProbabilityLow - -- TODO: for hysteresis, increase probability if we're already using this peer - -data ProbabilityBand = ProbabilityLow - | ProbabilityModerate - | ProbabilityHigh - deriving (Eq, Ord, Show) - - -{- -In the second phase we walk over the prioritised fetch suffixes for each peer -and make a decision about whether we should initiate any new fetch requests. - -This decision is based on a number of factors: - - * Is the fetch suffix empty? If so, there's nothing to do. - * Do we already have block fetch requests in flight with this peer? - * If so are we under the maximum number of in-flight blocks for this peer? - * Is this peer still performing within expectations or has it missed any soft - time outs? - * Has the peer missed any hard timeouts or otherwise been disconnected. - * Are we at our soft or hard limit of the number of peers we are prepared to - fetch blocks from concurrently? - -We look at each peer chain fetch suffix one by one. Of course decisions we -make earlier can affect decisions later, in particular the number of peers we -fetch from concurrently can increase if we fetch from a new peer, and we must -obviously take that into account when considering later peer chains. --} - - -fetchRequestDecisions - :: forall extra header peer. - ( Hashable peer - , HasHeader header - , Ord peer - ) - => FetchDecisionPolicy header - -> FetchMode - -> [( FetchDecision [AnchoredFragment header] - , PeerFetchStatus header - , PeerFetchInFlight header - , PeerGSV - , peer - , extra)] - -> [(FetchDecision (FetchRequest header), extra)] -fetchRequestDecisions fetchDecisionPolicy fetchMode chains = - go nConcurrentFetchPeers0 Set.empty NoMaxSlotNo chains - where - go :: Word - -> Set (Point header) - -> MaxSlotNo - -> [(Either FetchDecline [AnchoredFragment header], - PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer, extra)] - -> [(FetchDecision (FetchRequest header), extra)] - go !_ !_ !_ [] = [] - go !nConcurrentFetchPeers !blocksFetchedThisRound !maxSlotNoFetchedThisRound - ((mchainfragments, status, inflight, gsvs, peer, extra) : cps) = - - (decision, extra) - : go nConcurrentFetchPeers' blocksFetchedThisRound' - maxSlotNoFetchedThisRound' cps - where - decision = fetchRequestDecision - fetchDecisionPolicy - fetchMode - -- Permit the preferred peers to by pass any concurrency limits. - (if elem peer nPreferedPeers then 0 - else nConcurrentFetchPeers) - (calculatePeerFetchInFlightLimits gsvs) - inflight - status - mchainfragments' - - mchainfragments' = - case fetchMode of - FetchModeDeadline -> mchainfragments - FetchModeBulkSync -> do - chainfragments <- mchainfragments - let fragments = - concatMap (filterWithMaxSlotNo - notFetchedThisRound - maxSlotNoFetchedThisRound) - chainfragments - guard (not (null fragments)) ?! FetchDeclineInFlightOtherPeer - return fragments - where - notFetchedThisRound h = - blockPoint h `Set.notMember` blocksFetchedThisRound - - nConcurrentFetchPeers' - -- increment if it was idle, and now will not be - | peerFetchReqsInFlight inflight == 0 - , Right{} <- decision = nConcurrentFetchPeers + 1 - | otherwise = nConcurrentFetchPeers - - -- This is only for avoiding duplication between fetch requests in this - -- round of decisions. Avoiding duplication with blocks that are already - -- in flight is handled by filterNotAlreadyInFlightWithOtherPeers - (blocksFetchedThisRound', maxSlotNoFetchedThisRound') = - case decision of - Left _ -> - (blocksFetchedThisRound, maxSlotNoFetchedThisRound) - Right (FetchRequest fragments) -> - (blocksFetchedThisRound `Set.union` blocksFetchedThisDecision, - maxSlotNoFetchedThisRound `max` maxSlotNoFetchedThisDecision) - where - maxSlotNoFetchedThisDecision = - foldl' max NoMaxSlotNo $ map MaxSlotNo $ - mapMaybe (withOriginToMaybe . AF.headSlot) fragments - - blocksFetchedThisDecision = - Set.fromList - [ blockPoint header - | fragment <- fragments - , header <- AF.toOldestFirst fragment ] - - nConcurrentFetchPeers0 = fromIntegral $ Set.size nActivePeers - - -- Set of peers with outstanding bytes. - nActivePeers :: Set peer - nActivePeers = - Set.fromList - . map snd - . filter (\(inFlight, _) -> inFlight > 0) - . map (\(_, _, PeerFetchInFlight{peerFetchReqsInFlight}, _, p, _) -> - (peerFetchReqsInFlight, p)) - $ chains - - -- Order the peers based on current PeerGSV. The top performing peers will be - -- permitted to go active even if we're above the desired maxConcurrentFetchPeers - -- which will cause us to switch smoothly from a slower to faster peers. - -- When switching from slow to faster peers we will be over the configured limit, but - -- PeerGSV is expected to be updated rather infrequently so the set of preferred peers should - -- be stable during 10s of seconds. - nPreferedPeers :: [peer] - nPreferedPeers = - map snd - . take (fromIntegral maxConcurrentFetchPeers) - . sortBy (\a b -> comparePeerGSV nActivePeers (peerSalt fetchDecisionPolicy) a b) - . map (\(_, _, _, gsv, p, _) -> (gsv, p)) - $ chains - - maxConcurrentFetchPeers :: Word - maxConcurrentFetchPeers = - case fetchMode of - FetchModeBulkSync -> maxConcurrencyBulkSync fetchDecisionPolicy - FetchModeDeadline -> maxConcurrencyDeadline fetchDecisionPolicy - - -fetchRequestDecision - :: HasHeader header - => FetchDecisionPolicy header - -> FetchMode - -> Word - -> PeerFetchInFlightLimits - -> PeerFetchInFlight header - -> PeerFetchStatus header - -> FetchDecision [AnchoredFragment header] - -> FetchDecision (FetchRequest header) - -fetchRequestDecision _ _ _ _ _ _ (Left decline) - = Left decline - -fetchRequestDecision _ _ _ _ _ PeerFetchStatusShutdown _ - = Left FetchDeclinePeerShutdown - -fetchRequestDecision _ _ _ _ _ PeerFetchStatusStarting _ - = Left FetchDeclinePeerStarting - -fetchRequestDecision _ _ _ _ _ PeerFetchStatusAberrant _ - = Left FetchDeclinePeerSlow - -fetchRequestDecision FetchDecisionPolicy { - maxConcurrencyBulkSync, - maxConcurrencyDeadline, - maxInFlightReqsPerPeer, - blockFetchSize - } - fetchMode - nConcurrentFetchPeers - PeerFetchInFlightLimits { - inFlightBytesLowWatermark, - inFlightBytesHighWatermark - } - PeerFetchInFlight { - peerFetchReqsInFlight, - peerFetchBytesInFlight - } - peerFetchStatus - (Right fetchFragments) - - | peerFetchReqsInFlight >= maxInFlightReqsPerPeer - = Left $ FetchDeclineReqsInFlightLimit - maxInFlightReqsPerPeer - - | peerFetchBytesInFlight >= inFlightBytesHighWatermark - = Left $ FetchDeclineBytesInFlightLimit - peerFetchBytesInFlight - inFlightBytesLowWatermark - inFlightBytesHighWatermark - - -- This covers the case when we could still fit in more reqs or bytes, but - -- we want to let it drop below a low water mark before sending more so we - -- get a bit more batching behaviour, rather than lots of 1-block reqs. - | peerFetchStatus == PeerFetchStatusBusy - = Left $ FetchDeclinePeerBusy - peerFetchBytesInFlight - inFlightBytesLowWatermark - inFlightBytesHighWatermark - - -- Refuse any blockrequest if we're above the concurrency limit. - | let maxConcurrentFetchPeers = case fetchMode of - FetchModeBulkSync -> maxConcurrencyBulkSync - FetchModeDeadline -> maxConcurrencyDeadline - , nConcurrentFetchPeers > maxConcurrentFetchPeers - = Left $ FetchDeclineConcurrencyLimit - fetchMode maxConcurrentFetchPeers - - -- If we're at the concurrency limit refuse any additional peers. - | peerFetchReqsInFlight == 0 - , let maxConcurrentFetchPeers = case fetchMode of - FetchModeBulkSync -> maxConcurrencyBulkSync - FetchModeDeadline -> maxConcurrencyDeadline - , nConcurrentFetchPeers == maxConcurrentFetchPeers - = Left $ FetchDeclineConcurrencyLimit - fetchMode maxConcurrentFetchPeers - - -- We've checked our request limit and our byte limit. We are then - -- guaranteed to get at least one non-empty request range. - | otherwise - = assert (peerFetchReqsInFlight < maxInFlightReqsPerPeer) $ - assert (not (null fetchFragments)) $ - - Right $ selectBlocksUpToLimits - blockFetchSize - peerFetchReqsInFlight - maxInFlightReqsPerPeer - peerFetchBytesInFlight - inFlightBytesHighWatermark - fetchFragments - - --- | --- --- Precondition: The result will be non-empty if --- --- Property: result is non-empty if preconditions satisfied --- -selectBlocksUpToLimits - :: forall header. HasHeader header - => (header -> SizeInBytes) -- ^ Block body size - -> Word -- ^ Current number of requests in flight - -> Word -- ^ Maximum number of requests in flight allowed - -> SizeInBytes -- ^ Current number of bytes in flight - -> SizeInBytes -- ^ Maximum number of bytes in flight allowed - -> [AnchoredFragment header] - -> FetchRequest header -selectBlocksUpToLimits blockFetchSize nreqs0 maxreqs nbytes0 maxbytes fragments = - assert (nreqs0 < maxreqs && nbytes0 < maxbytes && not (null fragments)) $ - -- The case that we are already over our limits has to be checked earlier, - -- outside of this function. From here on however we check for limits. - - let fragments' = goFrags nreqs0 nbytes0 fragments in - assert (all (not . AF.null) fragments') $ - FetchRequest fragments' - where - goFrags :: Word - -> SizeInBytes - -> [AnchoredFragment header] -> [AnchoredFragment header] - goFrags _ _ [] = [] - goFrags nreqs nbytes (c:cs) - | nreqs+1 > maxreqs = [] - | otherwise = goFrag (nreqs+1) nbytes (AF.Empty (AF.anchor c)) c cs - -- Each time we have to pick from a new discontiguous chain fragment then - -- that will become a new request, which contributes to our in-flight - -- request count. We never break the maxreqs limit. +fetchDecisions + _tracer + fetchDecisionPolicy + FetchModeDeadline + currentChain + fetchedBlocks + fetchedMaxSlotNo + _chainSelStarvation + _peersOrderHandlers + candidatesAndPeers + = + pure + $ fetchDecisionsDeadline + fetchDecisionPolicy + currentChain + fetchedBlocks + fetchedMaxSlotNo + candidatesAndPeers - goFrag :: Word - -> SizeInBytes - -> AnchoredFragment header - -> AnchoredFragment header - -> [AnchoredFragment header] -> [AnchoredFragment header] - goFrag nreqs nbytes c' (Empty _) cs = c' : goFrags nreqs nbytes cs - goFrag nreqs nbytes c' (b :< c) cs - | nbytes' >= maxbytes = [c' :> b] - | otherwise = goFrag nreqs nbytes' (c' :> b) c cs - where - nbytes' = nbytes + blockFetchSize b - -- Note that we always pick the one last block that crosses the maxbytes - -- limit. This cover the case where we otherwise wouldn't even be able to - -- request a single block, as it's too large. +fetchDecisions + tracer + fetchDecisionPolicy + FetchModeBulkSync + currentChain + fetchedBlocks + fetchedMaxSlotNo + chainSelStarvation + peersOrderHandlers + candidatesAndPeers + = + fetchDecisionsBulkSyncM + tracer + fetchDecisionPolicy + currentChain + fetchedBlocks + fetchedMaxSlotNo + chainSelStarvation + peersOrderHandlers + candidatesAndPeers diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/BulkSync.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/BulkSync.hs new file mode 100644 index 00000000000..e37e990a257 --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/BulkSync.hs @@ -0,0 +1,547 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | BulkSync decision logic +-- +-- This module contains the part of the block fetch decisions process that is +-- specific to the bulk sync mode. This logic reuses parts of the logic for the +-- deadline mode, but it is inherently different. +-- +-- Definitions: +-- +-- - Let @inflight :: peer -> Set blk@ be the outstanding blocks, those that +-- have been requested and are expected to arrive but have not yet. +-- +-- - Let @peersOrder@ be an order of preference among the peers. This order is +-- not set in stone and will evolve as we go. +-- +-- - Let @currentPeer :: Maybe peer@ be the “current peer” with which we are +-- interacting. If it exists, this peer must be the best according to +-- @peersOrder@, and the last fetch request must have been sent to them. +-- +-- - Let @currentStart :: Time@ be the latest time a fetch request was sent +-- while there were no outstanding blocks. +-- +-- - Let @gracePeriod@ be a small duration (eg. 10s), during which a “cold” peer +-- is allowed to warm up (eg. grow the TCP window) before being expected to +-- feed blocks faster than we can validate them. +-- +-- One iteration of this decision logic: +-- +-- 0. If @inflight(currentPeer)@ is non-empty and the block validation component +-- has idled at any point after @currentStart@ plus @gracePeriod@, then the +-- peer @currentPeer@ has failed to promptly serve @inflight(currentPeer)@, +-- and: +-- +-- - If @currentPeer@ is the ChainSync Jumping dynamo, then it must +-- immediately be replaced as the dynamo. +-- +-- - Stop considering the peer “current” and make them the worst according to +-- the @peersOrder@. +-- +-- 1. Select @theCandidate :: AnchoredFragment (Header blk)@. This is the best +-- candidate header chain among the ChainSync clients (eg. best raw +-- tiebreaker among the longest). +-- +-- 2. Select @thePeer :: peer@. +-- +-- - Let @grossRequest@ be the oldest block on @theCandidate@ that has not +-- already been downloaded. +-- +-- - If @grossRequest@ is empty, then terminate this iteration. Otherwise, +-- pick the best peer (according to @peersOrder@) offering the block in +-- @grossRequest@. +-- +-- 3. Craft the actual request to @thePeer@ asking blocks of @theCandidate@: +-- +-- - If the byte size of @inflight(thePeer)@ is below the low-water mark, +-- then terminate this iteration. +-- +-- - Decide and send the actual next batch request, as influenced by exactly +-- which blocks are actually already currently in-flight with @thePeer@. +-- +-- 4. If we went through the election of a new peer, replace @currentPeer@ and +-- put the new peer at the front of @peersOrder@. Also reset @currentStart@ +-- if @inflights(thePeer)@ is empty. +-- +-- Terminate this iteration. +-- +-- About the influence of in-flight requests +-- ----------------------------------------- +-- +-- One can note that in-flight requests are ignored when finding a new peer, but +-- considered when crafting the actual request to a chosen peer. This is by +-- design. We explain the rationale here. +-- +-- If a peer proves too slow, then we give up on it (see point 0. above), even +-- if it has requests in-flight. In subsequent selections of peers (point 2.), +-- the blocks in these requests will not be removed from @theCandidate@ as, as +-- far as we know, these requests might never return. +-- +-- When crafting the actual request, we do need to consider the in-flight +-- requests of the peer, to avoid clogging our network. If some of these +-- in-flight requests date from when the peer was previously “current”, this +-- means that we cycled through all the peers that provided @theCandidate@ and +-- they all failed to serve our blocks promptly. +-- +-- This is a degenerate case of the algorithm that might happen but only be +-- transient. Soon enough, @theCandidate@ should be honest (if the consensus +-- layer does its job correctly), and there should exist an honest peer ready to +-- serve @theCandidate@ promptly. +-- +-- Interactions with ChainSync Jumping (CSJ) +-- ----------------------------------------- +-- +-- Because we always require our peers to be able to serve a gross request +-- with an old block, peers with longer chains have a better chance to pass +-- this criteria and to be selected as current peer. The CSJ dynamo, being +-- always ahead of jumpers, has therefore more chances to be selected as the +-- current peer. It is still possible for a jumper or a disengaged peer to be +-- selected. +-- +-- If the current peer is the CSJ dynamo and it is a dishonest peer that retains +-- blocks, it will get multiple opportunities to do so since it will be selected +-- as the current peer more often. We therefore rotate the dynamo every time it +-- is the current peer and it fails to serve blocks promptly. +-- +-- About the gross request +-- ----------------------- +-- +-- We want to select a peer that is able to serve us a batch of oldest blocks +-- of @theCandidate@. However, not every peer will be able to deliver these +-- batches as they might be on different chains. We therefore select a peer only +-- if its candidate fragment contains the block in the gross request. In this +-- way, we ensure that the peer can serve at least one block that we wish to +-- fetch. +-- +-- If the peer cannot offer any more blocks after that, it will be rotated out +-- soon. +-- +module Ouroboros.Network.BlockFetch.Decision.BulkSync ( + fetchDecisionsBulkSyncM +) where + +import Control.Exception (assert) +import Control.Monad (guard) +import Control.Monad.Class.MonadTime.SI (MonadMonotonicTime (getMonotonicTime), addTime) +import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT)) +import Control.Monad.Writer.Strict (Writer, runWriter, MonadWriter (tell)) +import Control.Tracer (Tracer, traceWith) +import Data.Bifunctor (first, Bifunctor (..)) +import Data.Foldable (toList) +import Data.DList (DList) +import qualified Data.DList as DList +import qualified Data.List as List +import Data.Sequence (Seq (..), (><), (<|), (|>)) +import qualified Data.Sequence as Sequence +import qualified Data.Set as Set +import Data.Maybe (maybeToList) + +import Cardano.Prelude (partitionEithers) + +import Ouroboros.Network.AnchoredFragment (AnchoredFragment) +import qualified Ouroboros.Network.AnchoredFragment as AF +import Ouroboros.Network.Block +import Ouroboros.Network.BlockFetch.ClientState + (FetchRequest (..), PeersOrder (..), peerFetchBlocksInFlight) +import Ouroboros.Network.BlockFetch.ConsensusInterface (FetchMode(FetchModeBulkSync)) +import Ouroboros.Network.BlockFetch.DeltaQ (calculatePeerFetchInFlightLimits) +import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation(..)) + +import Ouroboros.Network.BlockFetch.Decision.Deadline +import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent (..)) + +type WithDeclined peer = Writer (DList (FetchDecline, peer)) + +runWithDeclined :: WithDeclined peer a -> (a, DList (FetchDecline, peer)) +runWithDeclined = runWriter + +fetchDecisionsBulkSyncM + :: forall peer header block m extra. + (Ord peer, + HasHeader header, + HeaderHash header ~ HeaderHash block, MonadMonotonicTime m) + => Tracer m (TraceDecisionEvent peer header) + -> FetchDecisionPolicy header + -> AnchoredFragment header + -> (Point block -> Bool) + -> MaxSlotNo + -> ChainSelStarvation + -> ( PeersOrder peer + , PeersOrder peer -> m () + , peer -> m () + ) + -> [(AnchoredFragment header, PeerInfo header peer extra)] + -> m [(FetchDecision (FetchRequest header), PeerInfo header peer extra)] +fetchDecisionsBulkSyncM + tracer + fetchDecisionPolicy@FetchDecisionPolicy {bulkSyncGracePeriod} + currentChain + fetchedBlocks + fetchedMaxSlotNo + chainSelStarvation + ( peersOrder0, + writePeersOrder, + demoteCSJDynamo + ) + candidatesAndPeers = do + peersOrder1 <- checkLastChainSelStarvation peersOrder0 + + let (peersOrder, orderedCandidatesAndPeers) = + alignPeersOrderWithActualPeers + (peerInfoPeer . snd) + (Sequence.fromList candidatesAndPeers) + peersOrder1 + + -- Compute the actual block fetch decision. This contains only declines and + -- at most one request. 'theDecision' is therefore a 'Maybe'. + let (theDecision, declines) = + fetchDecisionsBulkSync + fetchDecisionPolicy + currentChain + fetchedBlocks + fetchedMaxSlotNo + (toList orderedCandidatesAndPeers) + + newCurrentPeer = peerInfoPeer . snd <$> theDecision + + case theDecision of + Just (_, (_, inflight, _, _, _)) + | Set.null (peerFetchBlocksInFlight inflight) + -- If there were no blocks in flight, then this will be the first request, + -- so we take a new current time. + -> do + peersOrderStart <- getMonotonicTime + writePeersOrder $ setCurrentPeer newCurrentPeer peersOrder + { peersOrderStart } + | newCurrentPeer /= peersOrderCurrent peersOrder0 + -- If the new current peer is not the old one, then we update the current + -- peer + -> + writePeersOrder $ setCurrentPeer newCurrentPeer peersOrder + _ -> pure () + + pure $ + map (first Right) (maybeToList theDecision) + ++ map (first Left) declines + where + -- Align the peers order with the actual peers; this consists in removing + -- all peers from the peers order that are not in the actual peers list and + -- adding at the end of the peers order all the actual peers that were not + -- there before. + alignPeersOrderWithActualPeers :: forall d. + (d -> peer) -> Seq d -> PeersOrder peer -> (PeersOrder peer, Seq d) + alignPeersOrderWithActualPeers + peerOf + actualPeers + PeersOrder {peersOrderStart, peersOrderCurrent, peersOrderAll} = + let peersOrderAll' = ( do + p <- peersOrderAll + case List.find ((p ==) . peerOf) actualPeers of + Just d -> pure d + Nothing -> Empty + ) >< Sequence.filter ((`notElem` peersOrderAll) . peerOf) actualPeers + -- Set the current peer to Nothing if it is not at the front of + -- the list. + peersOrderCurrent' = do + peer <- peersOrderCurrent + guard (any ((peer ==) . peerOf) $ Sequence.take 1 peersOrderAll') + pure peer + in (PeersOrder + { peersOrderCurrent = peersOrderCurrent', + -- INVARIANT met: Current peer is at the front if it exists + peersOrderAll = fmap peerOf peersOrderAll', + peersOrderStart + } + , peersOrderAll' + ) + + -- If the chain selection has been starved recently, that is after the + -- current peer started (and a grace period), then the current peer is + -- bad. We push it at the end of the queue, demote it from CSJ dynamo, + -- and ignore its in-flight blocks for the future. + checkLastChainSelStarvation :: PeersOrder peer -> m (PeersOrder peer) + checkLastChainSelStarvation + peersOrder@PeersOrder {peersOrderStart, peersOrderCurrent, peersOrderAll} = do + lastStarvationTime <- case chainSelStarvation of + ChainSelStarvationEndedAt time -> pure time + ChainSelStarvationOngoing -> getMonotonicTime + case peersOrderCurrent of + Just peer + | lastStarvationTime >= addTime bulkSyncGracePeriod peersOrderStart -> do + traceWith tracer (PeerStarvedUs peer) + demoteCSJDynamo peer + pure PeersOrder + { + peersOrderCurrent = Nothing, + -- INVARIANT met: there is no current peer + peersOrderAll = Sequence.drop 1 peersOrderAll |> peer, + peersOrderStart + } + _ -> pure peersOrder + + setCurrentPeer :: Maybe peer -> PeersOrder peer -> PeersOrder peer + setCurrentPeer Nothing peersOrder = peersOrder {peersOrderCurrent = Nothing} + setCurrentPeer (Just peer) peersOrder = + case Sequence.breakl ((peer ==)) (peersOrderAll peersOrder) of + (xs, p :<| ys) -> + peersOrder + { peersOrderCurrent = Just p, + -- INVARIANT met: Current peer is at the front + peersOrderAll = p <| xs >< ys + } + (_, Empty) -> peersOrder {peersOrderCurrent = Nothing} + +-- | Given a list of candidate fragments and their associated peers, choose what +-- to sync from who in the bulk sync mode. +fetchDecisionsBulkSync :: forall header block peer extra. + ( HasHeader header, + HeaderHash header ~ HeaderHash block + ) => + FetchDecisionPolicy header -> + -- | The current chain, anchored at the immutable tip. + AnchoredFragment header -> + (Point block -> Bool) -> + MaxSlotNo -> + -- | Association list of the candidate fragments and their associated peers. + -- The candidate fragments are anchored in the current chain (not necessarily + -- at the tip; and not necessarily forking off immediately). + [(AnchoredFragment header, PeerInfo header peer extra)] -> + -- | Association list of the requests and their associated peers. There is at + -- most one accepted request; everything else is declined. Morally, this is a + -- map from peers to @'FetchDecision' ('FetchRequest' header)@ with at most + -- one @'FetchRequest' header@. + ( Maybe (FetchRequest header, PeerInfo header peer extra), + [(FetchDecline, PeerInfo header peer extra)] + ) +fetchDecisionsBulkSync + fetchDecisionPolicy + currentChain + fetchedBlocks + fetchedMaxSlotNo + candidatesAndPeers = combineWithDeclined $ do + -- Step 1: Select the candidate to sync from. This already eliminates peers + -- that have an implausible candidate. It returns the remaining candidates + -- (with their corresponding peer) as suffixes of the immutable tip. + ( theCandidate :: ChainSuffix header, + candidatesAndPeers' :: [(ChainSuffix header, PeerInfo header peer extra)] + ) <- + MaybeT $ + selectTheCandidate + fetchDecisionPolicy + currentChain + candidatesAndPeers + + -- Step 2: Filter out from the chosen candidate fragment the blocks that + -- have already been downloaded. NOTE: if not declined, @theFragments@ is + -- guaranteed to be non-empty. + theFragments :: CandidateFragments header + <- MaybeT $ dropAlreadyFetchedBlocks candidatesAndPeers' theCandidate + + -- Step 3: Select the peer to sync from. This eliminates peers that cannot + -- serve a reasonable batch of the candidate, then chooses the peer to sync + -- from, then again declines the others. + ( thePeerCandidate :: ChainSuffix header, + thePeer :: PeerInfo header peer extra + ) <- + MaybeT $ selectThePeer theFragments candidatesAndPeers' + + -- Step 4: Fetch the candidate from the selected peer, potentially declining + -- it (eg. if the peer is already too busy). + MaybeT $ + makeFetchRequest + fetchDecisionPolicy + theFragments + thePeer + thePeerCandidate + where + combineWithDeclined :: forall peerInfo a. + MaybeT (WithDeclined peerInfo) (a, peerInfo) -> + ( Maybe (a, peerInfo), + [(FetchDecline, peerInfo)] + ) + combineWithDeclined = second DList.toList . runWithDeclined . runMaybeT + + dropAlreadyFetchedBlocks :: forall peerInfo. + [(ChainSuffix header, peerInfo)] -> + ChainSuffix header -> + WithDeclined peerInfo (Maybe (CandidateFragments header)) + dropAlreadyFetchedBlocks candidatesAndPeers' theCandidate = + case dropAlreadyFetched fetchedBlocks fetchedMaxSlotNo theCandidate of + Left reason -> do + tell (DList.fromList [(reason, peerInfo) | (_, peerInfo) <- candidatesAndPeers']) + pure Nothing + Right theFragments -> pure (Just theFragments) + +-- | Given a list of candidate fragments and their associated peers, select the +-- candidate to sync from. Return this fragment, the list of peers that are +-- still in race to serve it, and the list of peers that are already being +-- declined. +selectTheCandidate :: + forall header peerInfo. + ( HasHeader header + ) => + FetchDecisionPolicy header -> + -- | The current chain. + AnchoredFragment header -> + -- | The candidate fragments and their associated peers. + [(AnchoredFragment header, peerInfo)] -> + -- | The pair of: (a) a list of peers that we have decided are not right, eg. + -- because they presented us with a chain forking too deep, and (b) the + -- selected candidate that we choose to sync from and a list of peers that are + -- still in the race to serve that candidate. + WithDeclined + peerInfo + (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])) +selectTheCandidate + FetchDecisionPolicy {compareCandidateChains, plausibleCandidateChain} + currentChain = + separateDeclinedAndStillInRace + -- Select the suffix up to the intersection with the current chain. This can + -- eliminate candidates that fork too deep. + . selectForkSuffixes currentChain + -- Filter to keep chains the consensus layer tells us are plausible. + . filterPlausibleCandidates plausibleCandidateChain currentChain + where + -- Write all of the declined peers, and find the longest candidate + -- fragment if there is any. + separateDeclinedAndStillInRace :: + [(FetchDecision (ChainSuffix header), peerInfo)] -> + WithDeclined peerInfo (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])) + separateDeclinedAndStillInRace decisions = do + let (declined, inRace) = partitionEithers + [ bimap ((,p)) ((,p)) d | (d, p) <- decisions ] + tell (DList.fromList declined) + case inRace of + [] -> pure Nothing + _ : _ -> do + let maxChainOn f c0 c1 = case compareCandidateChains (f c0) (f c1) of + LT -> c1 + _ -> c0 + -- maximumBy yields the last element in case of a tie while we + -- prefer the first one + chainSfx = fst $ + List.foldl1' (maxChainOn (getChainSuffix . fst)) inRace + pure $ Just (chainSfx, inRace) + +-- | Given _the_ candidate fragment to sync from, and a list of peers (with +-- their corresponding candidate fragments), choose which peer to sync _the_ +-- candidate fragment from. +-- +-- We first filter out all the peers that cannot even serve a reasonable batch +-- of _the_ candidate fragment, and then we choose the first one according to +-- the ordering passed as argument. +-- +-- PRECONDITION: The set of peers must be included in the peer order queue. +-- PRECONDITION: The given candidate fragments must not be empty. +selectThePeer :: + forall header peer extra. + HasHeader header => + -- | The candidate fragment that we have selected to sync from, as suffix of + -- the immutable tip. + CandidateFragments header -> + -- | Association list of candidate fragments (as suffixes of the immutable + -- tip) and their associated peers. + [(ChainSuffix header, PeerInfo header peer extra)] -> + WithDeclined + (PeerInfo header peer extra) + (Maybe (ChainSuffix header, PeerInfo header peer extra)) +selectThePeer + theFragments + candidates = do + -- Create a fetch request for the blocks in question. The request has exactly + -- 1 block. It will only be used to choose the peer to fetch from, but we will + -- later craft a more refined request for that peer. See [About the gross + -- request] in the module documentation. Because @theFragments@ is not + -- empty, and does not contain empty fragments, @grossRequest@ will not be empty. + let firstBlock = map (AF.takeOldest 1) . take 1 . filter (not . AF.null) + requestBlock = firstBlock $ snd theFragments + grossRequest = FetchRequest $ assert (all (not . AF.null) requestBlock) requestBlock + + -- Return the first peer that can serve the gross request and decline + -- the other peers. + go grossRequest candidates + where + go grossRequest (c@(candidate, peerInfo) : xs) = do + if requestHeadInCandidate candidate grossRequest then do + tell $ DList.fromList + [(FetchDeclineConcurrencyLimit FetchModeBulkSync 1, pInfo) + | (_, pInfo) <- xs + ] + pure (Just c) + else do + tell $ DList.fromList [(FetchDeclineAlreadyFetched, peerInfo)] + go grossRequest xs + go _grossRequest [] = pure Nothing + + + requestHeadInCandidate :: ChainSuffix header -> FetchRequest header -> Bool + requestHeadInCandidate candidate request = + case fetchRequestFragments request of + fragments@(_:_) + | AF.withinFragmentBounds + (AF.headPoint $ last fragments) + (getChainSuffix candidate) + -> + True + _ -> + False + +-- | Given a candidate and a peer to sync from, create a request for that +-- specific peer. We might take the 'FetchDecision' to decline the request, but +-- only for “good” reasons, eg. if the peer is already too busy. +makeFetchRequest :: + ( HasHeader header + ) => + FetchDecisionPolicy header -> + -- | The candidate fragment that we have selected to sync from, as suffix of + -- the immutable tip. + CandidateFragments header -> + -- | The peer that we have selected to sync from. + PeerInfo header peer extra -> + -- | Its candidate fragment as suffix of the immutable tip. + ChainSuffix header -> + WithDeclined + (PeerInfo header peer extra) + (Maybe (FetchRequest header, PeerInfo header peer extra)) +makeFetchRequest + fetchDecisionPolicy + theFragments + thePeer@(status, inflight, gsvs, _, _) + thePeerCandidate = + let theDecision = do + -- Drop blocks that are already in-flight with this peer. + fragments <- dropAlreadyInFlightWithPeer inflight theFragments + + -- Trim the fragments to the peer's candidate, keeping only blocks that + -- they may actually serve. + trimmedFragments <- trimFragmentsToCandidate thePeerCandidate (snd fragments) + + -- Try to create a request for those fragments. + fetchRequestDecision + fetchDecisionPolicy + FetchModeBulkSync + 0 -- bypass all concurrency limits. + (calculatePeerFetchInFlightLimits gsvs) + inflight + status + (Right trimmedFragments) + in case theDecision of + Left reason -> tell (DList.fromList [(reason, thePeer)]) >> pure Nothing + Right theRequest -> pure $ Just (theRequest, thePeer) + where + trimFragmentsToCandidate candidate fragments = + let trimmedFragments = + [ prefix + | fragment <- fragments + , Just (_, prefix, _, _) <- [AF.intersect (getChainSuffix candidate) fragment] + , not (AF.null prefix) + ] + in if null trimmedFragments + then Left FetchDeclineAlreadyFetched + else Right trimmedFragments diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/Deadline.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/Deadline.hs new file mode 100644 index 00000000000..1624e8db9e5 --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/Deadline.hs @@ -0,0 +1,1037 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeOperators #-} + +-- | This module contains the part of the block fetch decisions process that is +-- specific to the deadline mode. +module Ouroboros.Network.BlockFetch.Decision.Deadline where + +import Control.Exception (assert) +import Control.Monad.Class.MonadTime.SI (DiffTime) +import Data.Function (on) +import Data.Hashable +import Data.List (foldl', groupBy, sortBy, transpose) +import Data.Maybe (mapMaybe) +import Data.Set (Set) +import Data.Set qualified as Set +import GHC.Stack (HasCallStack) + +import Control.Monad (guard) + +import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..), AnchoredFragment) +import Ouroboros.Network.AnchoredFragment qualified as AF +import Ouroboros.Network.BlockFetch.DeltaQ (PeerFetchInFlightLimits (..)) +import Ouroboros.Network.Block +import Ouroboros.Network.Point (withOriginToMaybe) + +import Ouroboros.Network.BlockFetch.ClientState (FetchRequest (..), + PeerFetchInFlight (..), PeerFetchStatus (..)) +import Ouroboros.Network.BlockFetch.ConsensusInterface (FetchMode (..)) +import Ouroboros.Network.BlockFetch.DeltaQ (PeerGSV (..), SizeInBytes, calculatePeerFetchInFlightLimits, + comparePeerGSV, comparePeerGSV', estimateResponseDeadlineProbability) + + +data FetchDecisionPolicy header = FetchDecisionPolicy { + maxInFlightReqsPerPeer :: Word, -- A protocol constant. + + maxConcurrencyDeadline :: Word, + decisionLoopIntervalBulkSync :: DiffTime, + decisionLoopIntervalDeadline :: DiffTime, + peerSalt :: Int, + bulkSyncGracePeriod :: DiffTime, + + plausibleCandidateChain :: HasCallStack + => AnchoredFragment header + -> AnchoredFragment header -> Bool, + + compareCandidateChains :: HasCallStack + => AnchoredFragment header + -> AnchoredFragment header + -> Ordering, + + blockFetchSize :: header -> SizeInBytes + } + +type PeerInfo header peer extra = + ( PeerFetchStatus header, + PeerFetchInFlight header, + PeerGSV, + peer, + extra + ) + +peerInfoPeer :: PeerInfo header peer extra -> peer +peerInfoPeer (_, _, _, p, _) = p + +-- | Throughout the decision making process we accumulate reasons to decline +-- to fetch any blocks. This type is used to wrap intermediate and final +-- results. +-- +type FetchDecision result = Either FetchDecline result + +-- | All the various reasons we can decide not to fetch blocks from a peer. +-- +-- It is worth highlighting which of these reasons result from competition +-- among upstream peers. +-- +-- * 'FetchDeclineInFlightOtherPeer': decline this peer because all the +-- unfetched blocks of its candidate chain have already been requested from +-- other peers. This reason reflects the least-consequential competition +-- among peers: the competition that determines merely which upstream peer to +-- burden with the request (eg the one with the best +-- 'Ouroboros.Network.BlockFetch.DeltaQ.DeltaQ' metrics). The consequences +-- are relatively minor because the unfetched blocks on this peer's candidate +-- chain will be requested regardless; it's merely a question of "From who?". +-- (One exception: if an adversarial peer wins this competition such that the +-- blocks are only requested from them, then it may be possible that this +-- decision determines whether the blocks are ever /received/. But that +-- depends on details of timeouts, a longer competing chain being soon +-- received within those timeouts, and so on.) +-- +-- * 'FetchDeclineChainNotPlausible': decline this peer because the node has +-- already fetched, validated, and selected a chain better than its candidate +-- chain from other peers (or from the node's own block forge). Because the +-- node's current selection is influenced by what blocks other peers have +-- recently served (or it recently minted), this reason reflects that peers +-- /indirectly/ compete by serving as long of a chain as possible and as +-- promptly as possible. When the tips of the peers' selections are all +-- within their respective forecast horizons (see +-- 'Ouroboros.Consensus.Ledger.SupportsProtocol.ledgerViewForecastAt'), then +-- the length of their candidate chains will typically be the length of their +-- selections, since the ChainSync is free to race ahead (in contrast, the +-- BlockFetch pipeline depth is bounded such that it will, for a syncing +-- node, not be able to request all blocks between the selection and the end +-- of the forecast window). But if one or more of their tips is beyond the +-- horizon, then the relative length of the candidate chains is more +-- complicated, influenced by both the relative density of the chains' +-- suffixes and the relative age of the chains' intersection with the node's +-- selection (since each peer's forecast horizon is a fixed number of slots +-- after the candidate's successor of that intersection). +-- +-- * 'FetchDeclineConcurrencyLimit': decline this peer while the node has +-- already fully allocated the artificially scarce 'maxConcurrentFetchPeers' +-- resource amongst its other peers. This reason reflects the +-- least-fundamental competition: it's the only way a node would decline a +-- candidate chain C that it would immediately switch to if C had somehow +-- already been fetched (and any better current candidates hadn't). It is +-- possible that this peer's candidate fragment is better than the candidate +-- fragments of other peers, but that should only happen ephemerally (eg for +-- a brief while immediately after first connecting to this peer). +-- +-- * 'FetchDeclineChainIntersectionTooDeep': decline this peer because the node's +-- selection has more than @K@ blocks that are not on this peer's candidate +-- chain. Typically, this reason occurs after the node has been declined---ie +-- lost the above competitions---for a long enough duration. This decision +-- only arises if the BlockFetch decision logic wins a harmless race against +-- the ChainSync client once the node's selection gets longer, since +-- 'Ouroboros.Consensus.MiniProtocol.ChainSync.Client.ForkTooDeep' +-- disconnects from such a peer. +-- +data FetchDecline = + -- | This peer's candidate chain is not longer than our chain. For more + -- details see + -- 'Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface.mkBlockFetchConsensusInterface' + -- which implements 'plausibleCandidateChain'. + -- + FetchDeclineChainNotPlausible + + -- | Switching to this peer's candidate chain would require rolling back + -- more than @K@ blocks. + -- + | FetchDeclineChainIntersectionTooDeep + + -- | Every block on this peer's candidate chain has already been fetched. + -- + | FetchDeclineAlreadyFetched + + -- | This peer's candidate chain has already been requested from this + -- peer. + -- + | FetchDeclineInFlightThisPeer + + -- | Some blocks on this peer's candidate chain have not yet been fetched, + -- but all of those have already been requested from other peers. + -- + | FetchDeclineInFlightOtherPeer + + -- | This peer's BlockFetch client is shutting down, see + -- 'PeerFetchStatusShutdown'. + -- + | FetchDeclinePeerShutdown + + -- | Blockfetch is starting up and waiting on corresponding Chainsync. + | FetchDeclinePeerStarting + + + -- The reasons above this comment are fundamental and/or obvious. On the + -- other hand, the reasons below are heuristic. + + + -- | This peer is in a potentially-temporary state in which it has not + -- responded to us within a certain expected time limit, see + -- 'PeerFetchStatusAberrant'. + -- + | FetchDeclinePeerSlow + + -- | This peer is not under the 'maxInFlightReqsPerPeer' limit. + -- + -- The argument is the 'maxInFlightReqsPerPeer' constant. + -- + | FetchDeclineReqsInFlightLimit !Word + + -- | This peer is not under the 'inFlightBytesHighWatermark' bytes limit. + -- + -- The arguments are: + -- + -- * number of bytes currently in flight for that peer + -- * the configured 'inFlightBytesLowWatermark' constant + -- * the configured 'inFlightBytesHighWatermark' constant + -- + | FetchDeclineBytesInFlightLimit !SizeInBytes !SizeInBytes !SizeInBytes + + -- | This peer is not under the 'inFlightBytesLowWatermark'. + -- + -- The arguments are: + -- + -- * number of bytes currently in flight for that peer + -- * the configured 'inFlightBytesLowWatermark' constant + -- * the configured 'inFlightBytesHighWatermark' constant + -- + | FetchDeclinePeerBusy !SizeInBytes !SizeInBytes !SizeInBytes + + -- | The node is not under the 'maxConcurrentFetchPeers' limit. + -- + -- The arguments are: + -- + -- * the current 'FetchMode' + -- * the corresponding configured limit constant, either + -- 'maxConcurrencyDeadline', or 1 for bulk sync. + -- + | FetchDeclineConcurrencyLimit !FetchMode !Word + deriving (Eq, Show) + +-- | The \"oh noes?!\" operator. +-- +-- In the case of an error, the operator provides a specific error value. +-- +(?!) :: Maybe a -> e -> Either e a +Just x ?! _ = Right x +Nothing ?! e = Left e + +-- | The combination of a 'ChainSuffix' and a list of discontiguous +-- 'AnchoredFragment's: +-- +-- * When comparing two 'CandidateFragments' as candidate chains, we use the +-- 'ChainSuffix'. +-- +-- * To track which blocks of that candidate still have to be downloaded, we +-- use a list of discontiguous 'AnchoredFragment's. +-- +type CandidateFragments header = (ChainSuffix header, [AnchoredFragment header]) + + +fetchDecisionsDeadline + :: (Ord peer, + Hashable peer, + HasHeader header, + HeaderHash header ~ HeaderHash block) + => FetchDecisionPolicy header + -> AnchoredFragment header + -> (Point block -> Bool) + -> MaxSlotNo + -> [(AnchoredFragment header, PeerInfo header peer extra)] + -> [(FetchDecision (FetchRequest header), PeerInfo header peer extra)] + +fetchDecisionsDeadline fetchDecisionPolicy@FetchDecisionPolicy { + plausibleCandidateChain, + compareCandidateChains, + blockFetchSize, + peerSalt + } + currentChain + fetchedBlocks + fetchedMaxSlotNo = + + -- Finally, make a decision for each (chain, peer) pair. + fetchRequestDecisions fetchDecisionPolicy + . map swizzleSIG + + -- Reorder chains based on consensus policy and network timing data. + . prioritisePeerChains peerSalt compareCandidateChains blockFetchSize + . map swizzleIG + + -- Filter to keep blocks that are not already in-flight for this peer. + . dropAlreadyInFlightWithPeer' + . map swizzleI + + -- Filter to keep blocks that have not already been downloaded. + . dropAlreadyFetched' + fetchedBlocks + fetchedMaxSlotNo + + -- Select the suffix up to the intersection with the current chain. + . selectForkSuffixes + currentChain + + -- First, filter to keep chains the consensus layer tells us are plausible. + . filterPlausibleCandidates + plausibleCandidateChain + currentChain + where + -- Data swizzling functions to get the right info into each stage. + swizzleI (c, p@(_, inflight,_,_, _)) = (c, inflight, p) + swizzleIG (c, p@(_, inflight,gsvs,peer,_)) = (c, inflight, gsvs, peer, p) + swizzleSIG (c, p@(status,inflight,gsvs,peer,_)) = (c, status, inflight, gsvs, peer, p) + +{- +We have the node's /current/ or /adopted/ chain. This is the node's chain in +the sense specified by the Ouroboros algorithm. It is a fully verified chain +with block bodies and a ledger state. + + ┆ ┆ + ├───┤ + │ │ + ├───┤ + │ │ + ├───┤ + │ │ + ├───┤ + │ │ + ───┴───┴─── current chain length (block number) + +With chain selection we are interested in /candidate/ chains. We have these +candidate chains in the form of chains of verified headers, but without bodies. + +The consensus layer gives us the current set of candidate chains from our peers +and we have the task of selecting which block bodies to download, and then +passing those block bodes back to the consensus layer. The consensus layer will +try to validate them and decide if it wants to update its current chain. + + ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ + ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ + ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ + ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ + ├───┤ ├───┤ ├───┤ ├───┤ └───┘ + │ │ │ │ │ │ │ │ + ───┴───┴─────┼───┼─────┼───┼─────┼───┼───────────── current chain length + │ │ │ │ │ │ + current ├───┤ ├───┤ └───┘ + (blocks) │ │ │ │ + └───┘ └───┘ + A B C D + candidates + (headers) + +In this example we have four candidate chains, with all but chain D strictly +longer than our current chain. + +In general there are many candidate chains. We make a distinction between a +candidate chain and the peer from which it is available. It is often the +case that the same chain is available from multiple peers. We will try to be +clear about when we are referring to chains or the combination of a chain and +the peer from which it is available. + +For the sake of the example let us assume we have the four chains above +available from the following peers. + +peer 1 2 3 4 5 6 7 + ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ + ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ + ├───┤ ├───┤ ├───┤ ├───┤ └───┘ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ │ │ + ──┼───┼─────┼───┼─────┼───┼─────┼───┼───────────────┼───┼─────┼───┼── + │ │ │ │ │ │ │ │ │ │ │ │ + └───┘ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ + └───┘ └───┘ └───┘ └───┘ └───┘ +chain C A B A D B A + +This is the form in which we are informed about candidate chains from the +consensus layer, the combination of a chain and the peer it is from. This +makes sense, since these things change independently. + +We will process the chains in this form, keeping the peer/chain combination all +the way through. Although there could in principle be some opportunistic saving +by sharing when multiple peers provide the same chain, taking advantage of this +adds complexity and does nothing to improve our worst case costs. + +We are only interested in candidate chains that are strictly longer than our +current chain. So our first task is to filter down to this set. +-} + +-- | Keep only those candidate chains that are preferred over the current +-- chain. Typically, this means that their length is longer than the length of +-- the current chain. +-- +filterPlausibleCandidates + :: (AnchoredFragment block -> AnchoredFragment header -> Bool) + -> AnchoredFragment block -- ^ The current chain + -> [(AnchoredFragment header, peerinfo)] + -> [(FetchDecision (AnchoredFragment header), peerinfo)] +filterPlausibleCandidates plausibleCandidateChain currentChain chains = + [ (chain', peer) + | (chain, peer) <- chains + , let chain' = do + guard (plausibleCandidateChain currentChain chain) + ?! FetchDeclineChainNotPlausible + return chain + ] + +{- +In the example, this leaves us with only the candidate chains: A, B and C, but +still paired up with the various peers. + + +peer 1 2 3 4 6 7 + ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ + ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ │ │ + ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ │ │ + ──┼───┼─────┼───┼─────┼───┼─────┼───┼───────────────┼───┼─────┼───┼── + │ │ │ │ │ │ │ │ │ │ │ │ + └───┘ ├───┤ ├───┤ ├───┤ ├───┤ ├───┤ + │ │ │ │ │ │ │ │ │ │ + └───┘ └───┘ └───┘ └───┘ └───┘ +chain C A B A B A +-} + +{- +Of course we would at most need to download the blocks in a candidate chain +that are not already in the current chain. So we must find those intersections. + +Before we do that, lets define how we represent a suffix of a chain. We do this +very simply as a chain fragment: exactly those blocks contained in the suffix. +A chain fragment is of course not a chain, but has many similar invariants. + +We will later also need to represent chain ranges when we send block fetch +requests. We do this using a pair of points: the first and last blocks in the +range. While we can represent an empty chain fragment, we cannot represent an +empty fetch range, but this is ok since we never request empty ranges. + + Chain fragment + ┌───┐ + │ ◉ │ Start of range, inclusive + ├───┤ + │ │ + ├───┤ + │ │ + ├───┤ + │ │ + ├───┤ + │ ◉ │ End of range, inclusive. + └───┘ +-} + +-- | A chain suffix, obtained by intersecting a candidate chain with the +-- current chain. +-- +-- The anchor point of a 'ChainSuffix' will be a point within the bounds of +-- the current chain ('AF.withinFragmentBounds'), indicating that it forks off +-- in the last @K@ blocks. +-- +-- A 'ChainSuffix' must be non-empty, as an empty suffix, i.e. the candidate +-- chain is equal to the current chain, would not be a plausible candidate. +newtype ChainSuffix header = + ChainSuffix { getChainSuffix :: AnchoredFragment header } + +{- +We define the /chain suffix/ as the suffix of the candidate chain up until (but +not including) where it intersects the current chain. + + + current peer 1 peer 2 + + ┆ ┆ + ├───┤ + │ ◀┿━━━━━━━━━━━━━━━━━┓ + ├───┤ ┌─╂─┐ + │ │ │ ◉ │ + ├───┤ ├───┤ + │ │ │ │ + ├───┤ ├───┤ + │ ◀┿━━━━━━━┓ │ │ + ───┴───┴─────┬─╂─┬─────┼───┼─── + │ ◉ │ │ │ + └───┘ ├───┤ + │ ◉ │ + └───┘ + C A + +In this example we found that C was a strict extension of the current chain +and chain A was a short fork. + +Note that it's possible that we don't find any intersection within the last K +blocks. This means the candidate forks by more than K and so we are not +interested in this candidate at all. +-} + +-- | Find the chain suffix for a candidate chain, with respect to the +-- current chain. +-- +chainForkSuffix + :: (HasHeader header, HasHeader block, + HeaderHash header ~ HeaderHash block) + => AnchoredFragment block -- ^ Current chain. + -> AnchoredFragment header -- ^ Candidate chain + -> Maybe (ChainSuffix header) +chainForkSuffix current candidate = + case AF.intersect current candidate of + Nothing -> Nothing + Just (_, _, _, candidateSuffix) -> + -- If the suffix is empty, it means the candidate chain was equal to + -- the current chain and didn't fork off. Such a candidate chain is + -- not a plausible candidate, so it must have been filtered out. + assert (not (AF.null candidateSuffix)) $ + Just (ChainSuffix candidateSuffix) + +selectForkSuffixes + :: (HasHeader header, HasHeader block, + HeaderHash header ~ HeaderHash block) + => AnchoredFragment block + -> [(FetchDecision (AnchoredFragment header), peerinfo)] + -> [(FetchDecision (ChainSuffix header), peerinfo)] +selectForkSuffixes current chains = + [ (mchain', peer) + | (mchain, peer) <- chains + , let mchain' = do + chain <- mchain + chainForkSuffix current chain ?! FetchDeclineChainIntersectionTooDeep + ] + +{- +We define the /fetch range/ as the suffix of the fork range that has not yet +had its blocks downloaded and block content checked against the headers. + + ┆ ┆ + ├───┤ + │ │ + ├───┤ ┌───┐ + │ │ already │ │ + ├───┤ fetched ├───┤ + │ │ blocks │ │ + ├───┤ ├───┤ + │ │ │░◉░│ ◄ fetch range + ───┴───┴─────┬───┬─────┼───┼─── + │░◉░│ ◄ │░░░│ + └───┘ ├───┤ + │░◉░│ ◄ + └───┘ + +In earlier versions of this scheme we maintained and relied on the invariant +that the ranges of fetched blocks are backwards closed. This meant we never had +discontinuous ranges of fetched or not-yet-fetched blocks. This invariant does +simplify things somewhat by keeping the ranges continuous however it precludes +fetching ranges of blocks from different peers in parallel. + +We do not maintain any such invariant and so we have to deal with there being +gaps in the ranges we have already fetched or are yet to fetch. To keep the +tracking simple we do not track the ranges themselves, rather we track the set +of individual blocks without their relationship to each other. + +-} + +-- | Find the fragments of the chain suffix that we still need to fetch because +-- they are covering blocks that have not yet been fetched. +-- +-- Typically this is a single fragment forming a suffix of the chain, but in +-- the general case we can get a bunch of discontiguous chain fragments. +-- +-- See also 'dropAlreadyInFlightWithPeer'. +dropAlreadyFetched :: + (HasHeader header, HeaderHash header ~ HeaderHash block) => + (Point block -> Bool) -> + MaxSlotNo -> + ChainSuffix header -> + FetchDecision (CandidateFragments header) +dropAlreadyFetched alreadyDownloaded fetchedMaxSlotNo candidate = + if null fragments + then Left FetchDeclineAlreadyFetched + else Right (candidate, fragments) + where + fragments = filterWithMaxSlotNo notAlreadyFetched fetchedMaxSlotNo (getChainSuffix candidate) + notAlreadyFetched = not . alreadyDownloaded . castPoint . blockPoint + +dropAlreadyFetched' :: + (HasHeader header, HeaderHash header ~ HeaderHash block) => + (Point block -> Bool) -> + MaxSlotNo -> + [(FetchDecision (ChainSuffix header), peerinfo)] -> + [(FetchDecision (CandidateFragments header), peerinfo)] +dropAlreadyFetched' alreadyDownloaded fetchedMaxSlotNo = + map + ( \(mcandidate, peer) -> + ((dropAlreadyFetched alreadyDownloaded fetchedMaxSlotNo =<< mcandidate), peer) + ) + +-- | Find the fragments of the chain suffix that we still need to fetch because +-- they are covering blocks that are not currently in the process of being +-- fetched from this peer. +-- +-- Typically this is a single fragment forming a suffix of the chain, but in +-- the general case we can get a bunch of discontiguous chain fragments. +-- +-- See also 'dropAlreadyFetched' +dropAlreadyInFlightWithPeer :: + (HasHeader header) => + PeerFetchInFlight header -> + CandidateFragments header -> + FetchDecision (CandidateFragments header) +dropAlreadyInFlightWithPeer inflight (candidate, chainfragments) = + if null fragments + then Left FetchDeclineInFlightThisPeer + else Right (candidate, fragments) + where + fragments = concatMap (filterWithMaxSlotNo notAlreadyInFlight (peerFetchMaxSlotNo inflight)) chainfragments + notAlreadyInFlight b = blockPoint b `Set.notMember` peerFetchBlocksInFlight inflight + +dropAlreadyInFlightWithPeer' :: + (HasHeader header) => + [(FetchDecision (CandidateFragments header), PeerFetchInFlight header, peerinfo)] -> + [(FetchDecision (CandidateFragments header), peerinfo)] +dropAlreadyInFlightWithPeer' = + map + ( \(mcandidatefragments, inflight, peer) -> + ((dropAlreadyInFlightWithPeer inflight =<< mcandidatefragments), peer) + ) + +-- | Filter a fragment. This is an optimised variant that will behave the same +-- as 'AnchoredFragment.filter' if the following precondition is satisfied: +-- +-- PRECONDITION: for all @hdr@ in the chain fragment: if @blockSlot hdr > +-- maxSlotNo@ then the predicate should not hold for any header after @hdr@ in +-- the chain fragment. +-- +-- For example, when filtering out already downloaded blocks from the +-- fragment, it does not make sense to keep filtering after having encountered +-- the highest slot number the ChainDB has seen so far: blocks with a greater +-- slot number cannot have been downloaded yet. When the candidate fragments +-- get far ahead of the current chain, e.g., @2k@ headers, this optimisation +-- avoids the linear cost of filtering these headers when we know in advance +-- they will all remain in the final fragment. In case the given slot number +-- is 'NoSlotNo', no filtering takes place, as there should be no matches +-- because we haven't downloaded any blocks yet. +-- +-- For example, when filtering out blocks already in-flight for the given +-- peer, the given @maxSlotNo@ can correspond to the block with the highest +-- slot number that so far has been in-flight for the given peer. When no +-- blocks have been in-flight yet, @maxSlotNo@ can be 'NoSlotNo', in which +-- case no filtering needs to take place, which makes sense, as there are no +-- blocks to filter out. Note that this is conservative: if a block is for +-- some reason multiple times in-flight (maybe it has to be redownloaded) and +-- the block's slot number matches the @maxSlotNo@, it will now be filtered +-- (while the filtering might previously have stopped before encountering the +-- block in question). This is fine, as the filter will now include the block, +-- because according to the filtering predicate, the block is not in-flight. +filterWithMaxSlotNo + :: forall header. HasHeader header + => (header -> Bool) + -> MaxSlotNo -- ^ @maxSlotNo@ + -> AnchoredFragment header + -> [AnchoredFragment header] +filterWithMaxSlotNo p maxSlotNo = + AF.filterWithStop p ((> maxSlotNo) . MaxSlotNo . blockSlot) + +prioritisePeerChains + :: forall extra header peer. + ( HasHeader header + , Hashable peer + , Ord peer + ) + => Int + -> (AnchoredFragment header -> AnchoredFragment header -> Ordering) + -> (header -> SizeInBytes) + -> [(FetchDecision (CandidateFragments header), PeerFetchInFlight header, + PeerGSV, + peer, + extra )] + -> [(FetchDecision [AnchoredFragment header], extra)] +prioritisePeerChains salt compareCandidateChains blockFetchSize = + map (\(decision, peer) -> + (fmap (\(_,_,fragment) -> fragment) decision, peer)) + . concatMap ( concat + . transpose + . groupBy (equatingFst + (equatingRight + ((==) `on` chainHeadPoint))) + . sortBy (comparingFst + (comparingRight + (compare `on` chainHeadPoint))) + ) + . groupBy (equatingFst + (equatingRight + (equatingPair + -- compare on probability band first, then preferred chain + (==) + (equateCandidateChains `on` getChainSuffix) + `on` + (\(band, chain, _fragments) -> (band, chain))))) + . sortBy (descendingOrder + (comparingFst + (comparingRight + (comparingPair + -- compare on probability band first, then preferred chain + compare + (compareCandidateChains `on` getChainSuffix) + `on` + (\(band, chain, _fragments) -> (band, chain)))))) + . map annotateProbabilityBand + . sortBy (\(_,_,a,ap,_) (_,_,b,bp,_) -> + comparePeerGSV' salt (a,ap) (b,bp)) + where + annotateProbabilityBand (Left decline, _, _, _, peer) = (Left decline, peer) + annotateProbabilityBand (Right (chain,fragments), inflight, gsvs, _, peer) = + (Right (band, chain, fragments), peer) + where + band = probabilityBand $ + estimateResponseDeadlineProbability + gsvs + (peerFetchBytesInFlight inflight) + (totalFetchSize blockFetchSize fragments) + deadline + + deadline = 2 -- seconds -- TODO: get this from external info + + equateCandidateChains chain1 chain2 + | EQ <- compareCandidateChains chain1 chain2 = True + | otherwise = False + + chainHeadPoint (_,ChainSuffix c,_) = AF.headPoint c + +totalFetchSize :: (header -> SizeInBytes) + -> [AnchoredFragment header] + -> SizeInBytes +totalFetchSize blockFetchSize fragments = + sum [ blockFetchSize header + | fragment <- fragments + , header <- AF.toOldestFirst fragment ] + +type Comparing a = a -> a -> Ordering +type Equating a = a -> a -> Bool + +descendingOrder :: Comparing a -> Comparing a +descendingOrder cmp = flip cmp + +comparingPair :: Comparing a -> Comparing b -> Comparing (a, b) +comparingPair cmpA cmpB (a1, b1) (a2, b2) = cmpA a1 a2 <> cmpB b1 b2 + +equatingPair :: Equating a -> Equating b -> Equating (a, b) +equatingPair eqA eqB (a1, b1) (a2, b2) = eqA a1 a2 && eqB b1 b2 + +comparingEither :: Comparing a -> Comparing b -> Comparing (Either a b) +comparingEither _ _ (Left _) (Right _) = LT +comparingEither cmpA _ (Left x) (Left y) = cmpA x y +comparingEither _ cmpB (Right x) (Right y) = cmpB x y +comparingEither _ _ (Right _) (Left _) = GT + +equatingEither :: Equating a -> Equating b -> Equating (Either a b) +equatingEither _ _ (Left _) (Right _) = False +equatingEither eqA _ (Left x) (Left y) = eqA x y +equatingEither _ eqB (Right x) (Right y) = eqB x y +equatingEither _ _ (Right _) (Left _) = False + +comparingFst :: Comparing a -> Comparing (a, b) +comparingFst cmp = cmp `on` fst + +equatingFst :: Equating a -> Equating (a, b) +equatingFst eq = eq `on` fst + +comparingRight :: Comparing b -> Comparing (Either a b) +comparingRight = comparingEither mempty + +equatingRight :: Equating b -> Equating (Either a b) +equatingRight = equatingEither (\_ _ -> True) + +-- | Given the probability of the download completing within the deadline, +-- classify that into one of three broad bands: high, medium and low. +-- +-- The bands are +-- +-- * high: 98% -- 100% +-- * medium: 75% -- 98% +-- * low: 0% -- 75% +-- +probabilityBand :: Double -> ProbabilityBand +probabilityBand p + | p > 0.98 = ProbabilityHigh + | p > 0.75 = ProbabilityModerate + | otherwise = ProbabilityLow + -- TODO: for hysteresis, increase probability if we're already using this peer + +data ProbabilityBand = ProbabilityLow + | ProbabilityModerate + | ProbabilityHigh + deriving (Eq, Ord, Show) + + +{- +In the second phase we walk over the prioritised fetch suffixes for each peer +and make a decision about whether we should initiate any new fetch requests. + +This decision is based on a number of factors: + + * Is the fetch suffix empty? If so, there's nothing to do. + * Do we already have block fetch requests in flight with this peer? + * If so are we under the maximum number of in-flight blocks for this peer? + * Is this peer still performing within expectations or has it missed any soft + time outs? + * Has the peer missed any hard timeouts or otherwise been disconnected. + * Are we at our soft or hard limit of the number of peers we are prepared to + fetch blocks from concurrently? + +We look at each peer chain fetch suffix one by one. Of course decisions we +make earlier can affect decisions later, in particular the number of peers we +fetch from concurrently can increase if we fetch from a new peer, and we must +obviously take that into account when considering later peer chains. +-} + + +fetchRequestDecisions + :: forall extra header peer. + ( Hashable peer + , HasHeader header + , Ord peer + ) + => FetchDecisionPolicy header + -> [( FetchDecision [AnchoredFragment header] + , PeerFetchStatus header + , PeerFetchInFlight header + , PeerGSV + , peer + , extra)] + -> [(FetchDecision (FetchRequest header), extra)] +fetchRequestDecisions fetchDecisionPolicy chains = + go nConcurrentFetchPeers0 Set.empty NoMaxSlotNo chains + where + go :: Word + -> Set (Point header) + -> MaxSlotNo + -> [(Either FetchDecline [AnchoredFragment header], + PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer, extra)] + -> [(FetchDecision (FetchRequest header), extra)] + go !_ !_ !_ [] = [] + go !nConcurrentFetchPeers !blocksFetchedThisRound !maxSlotNoFetchedThisRound + ((mchainfragments, status, inflight, gsvs, peer, extra) : cps) = + + (decision, extra) + : go nConcurrentFetchPeers' blocksFetchedThisRound' + maxSlotNoFetchedThisRound' cps + where + decision = fetchRequestDecision + fetchDecisionPolicy + FetchModeDeadline + -- Permit the preferred peers to by pass any concurrency limits. + (if elem peer nPreferedPeers then 0 + else nConcurrentFetchPeers) + (calculatePeerFetchInFlightLimits gsvs) + inflight + status + mchainfragments + + nConcurrentFetchPeers' + -- increment if it was idle, and now will not be + | peerFetchReqsInFlight inflight == 0 + , Right{} <- decision = nConcurrentFetchPeers + 1 + | otherwise = nConcurrentFetchPeers + + -- This is only for avoiding duplication between fetch requests in this + -- round of decisions. Avoiding duplication with blocks that are already + -- in flight is handled by dropAlreadyInFlightWithOtherPeers + (blocksFetchedThisRound', maxSlotNoFetchedThisRound') = + case decision of + Left _ -> + (blocksFetchedThisRound, maxSlotNoFetchedThisRound) + Right (FetchRequest fragments) -> + (blocksFetchedThisRound `Set.union` blocksFetchedThisDecision, + maxSlotNoFetchedThisRound `max` maxSlotNoFetchedThisDecision) + where + maxSlotNoFetchedThisDecision = + foldl' max NoMaxSlotNo $ map MaxSlotNo $ + mapMaybe (withOriginToMaybe . AF.headSlot) fragments + + blocksFetchedThisDecision = + Set.fromList + [ blockPoint header + | fragment <- fragments + , header <- AF.toOldestFirst fragment ] + + nConcurrentFetchPeers0 = fromIntegral $ Set.size nActivePeers + + -- Set of peers with outstanding bytes. + nActivePeers :: Set peer + nActivePeers = + Set.fromList + . map snd + . filter (\(inFlight, _) -> inFlight > 0) + . map (\(_, _, PeerFetchInFlight{peerFetchReqsInFlight}, _, p, _) -> + (peerFetchReqsInFlight, p)) + $ chains + + -- Order the peers based on current PeerGSV. The top performing peers will be + -- permitted to go active even if we're above the desired maxConcurrentFetchPeers + -- which will cause us to switch smoothly from a slower to faster peers. + -- When switching from slow to faster peers we will be over the configured limit, but + -- PeerGSV is expected to be updated rather infrequently so the set of preferred peers should + -- be stable during 10s of seconds. + nPreferedPeers :: [peer] + nPreferedPeers = + map snd + . take (fromIntegral $ maxConcurrencyDeadline fetchDecisionPolicy) + . sortBy (\a b -> comparePeerGSV nActivePeers (peerSalt fetchDecisionPolicy) a b) + . map (\(_, _, _, gsv, p, _) -> (gsv, p)) + $ chains + +-- | +-- +-- This function _does not_ check if the peer is likely to have the blocks in +-- the ranges, it only compute a request that respect what the peer's current +-- status indicates on their ability to fulfill it. +fetchRequestDecision + :: HasHeader header + => FetchDecisionPolicy header + -> FetchMode + -> Word + -- ^ Number of concurrent fetch peers. Can be set to @0@ to bypass + -- concurrency limits. + -> PeerFetchInFlightLimits + -> PeerFetchInFlight header + -> PeerFetchStatus header + -> FetchDecision [AnchoredFragment header] + -> FetchDecision (FetchRequest header) + +fetchRequestDecision _ _ _ _ _ _ (Left decline) + = Left decline + +fetchRequestDecision _ _ _ _ _ PeerFetchStatusShutdown _ + = Left FetchDeclinePeerShutdown + +fetchRequestDecision _ _ _ _ _ PeerFetchStatusStarting _ + = Left FetchDeclinePeerStarting + +fetchRequestDecision _ _ _ _ _ PeerFetchStatusAberrant _ + = Left FetchDeclinePeerSlow + +fetchRequestDecision FetchDecisionPolicy { + maxConcurrencyDeadline, + maxInFlightReqsPerPeer, + blockFetchSize + } + fetchMode + nConcurrentFetchPeers + PeerFetchInFlightLimits { + inFlightBytesLowWatermark, + inFlightBytesHighWatermark + } + PeerFetchInFlight { + peerFetchReqsInFlight, + peerFetchBytesInFlight + } + peerFetchStatus + (Right fetchFragments) + + | peerFetchReqsInFlight >= maxInFlightReqsPerPeer + = Left $ FetchDeclineReqsInFlightLimit + maxInFlightReqsPerPeer + + | peerFetchBytesInFlight >= inFlightBytesHighWatermark + = Left $ FetchDeclineBytesInFlightLimit -- FIXME: this one should be maybe not too bad. + peerFetchBytesInFlight + inFlightBytesLowWatermark + inFlightBytesHighWatermark + + -- This covers the case when we could still fit in more reqs or bytes, but + -- we want to let it drop below a low water mark before sending more so we + -- get a bit more batching behaviour, rather than lots of 1-block reqs. + | peerFetchStatus == PeerFetchStatusBusy + = Left $ FetchDeclinePeerBusy -- FIXME: also not too bad + peerFetchBytesInFlight + inFlightBytesLowWatermark + inFlightBytesHighWatermark + + -- Refuse any blockrequest if we're above the concurrency limit. + | let maxConcurrentFetchPeers = case fetchMode of + FetchModeBulkSync -> 1 + FetchModeDeadline -> maxConcurrencyDeadline + , nConcurrentFetchPeers > maxConcurrentFetchPeers + = Left $ FetchDeclineConcurrencyLimit + fetchMode maxConcurrentFetchPeers + + -- If we're at the concurrency limit refuse any additional peers. + | peerFetchReqsInFlight == 0 + , let maxConcurrentFetchPeers = case fetchMode of + FetchModeBulkSync -> 1 + FetchModeDeadline -> maxConcurrencyDeadline + , nConcurrentFetchPeers == maxConcurrentFetchPeers + = Left $ FetchDeclineConcurrencyLimit + fetchMode maxConcurrentFetchPeers + + -- We've checked our request limit and our byte limit. We are then + -- guaranteed to get at least one non-empty request range. + | otherwise + = assert (peerFetchReqsInFlight < maxInFlightReqsPerPeer) $ + assert (not (null fetchFragments)) $ + + Right $ selectBlocksUpToLimits + blockFetchSize + peerFetchReqsInFlight + maxInFlightReqsPerPeer + peerFetchBytesInFlight + inFlightBytesHighWatermark + fetchFragments + +-- | +-- +-- Precondition: The result will be non-empty if +-- +-- Property: result is non-empty if preconditions satisfied +-- +selectBlocksUpToLimits + :: forall header. HasHeader header + => (header -> SizeInBytes) -- ^ Block body size + -> Word -- ^ Current number of requests in flight + -> Word -- ^ Maximum number of requests in flight allowed + -> SizeInBytes -- ^ Current number of bytes in flight + -> SizeInBytes -- ^ Maximum number of bytes in flight allowed + -> [AnchoredFragment header] + -> FetchRequest header +selectBlocksUpToLimits blockFetchSize nreqs0 maxreqs nbytes0 maxbytes fragments = + assert (nreqs0 < maxreqs && nbytes0 < maxbytes && not (null fragments)) $ + -- The case that we are already over our limits has to be checked earlier, + -- outside of this function. From here on however we check for limits. + + let fragments' = goFrags nreqs0 nbytes0 fragments in + assert (all (not . AF.null) fragments') $ + FetchRequest fragments' + where + goFrags :: Word + -> SizeInBytes + -> [AnchoredFragment header] -> [AnchoredFragment header] + goFrags _ _ [] = [] + goFrags nreqs nbytes (c:cs) + | nreqs+1 > maxreqs = [] + | otherwise = goFrag (nreqs+1) nbytes (Empty (AF.anchor c)) c cs + -- Each time we have to pick from a new discontiguous chain fragment then + -- that will become a new request, which contributes to our in-flight + -- request count. We never break the maxreqs limit. + + goFrag :: Word + -> SizeInBytes + -> AnchoredFragment header + -> AnchoredFragment header + -> [AnchoredFragment header] -> [AnchoredFragment header] + goFrag nreqs nbytes c' (Empty _) cs = c' : goFrags nreqs nbytes cs + goFrag nreqs nbytes c' (b :< c) cs + | nbytes' >= maxbytes = [c' :> b] + | otherwise = goFrag nreqs nbytes' (c' :> b) c cs + where + nbytes' = nbytes + blockFetchSize b + -- Note that we always pick the one last block that crosses the maxbytes + -- limit. This cover the case where we otherwise wouldn't even be able to + -- request a single block, as it's too large. diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/Trace.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/Trace.hs new file mode 100644 index 00000000000..e32316def20 --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision/Trace.hs @@ -0,0 +1,11 @@ + +module Ouroboros.Network.BlockFetch.Decision.Trace where + +import Ouroboros.Network.BlockFetch.ClientState (TraceLabelPeer) +import Ouroboros.Network.Block (Point) +import Ouroboros.Network.BlockFetch.Decision.Deadline (FetchDecision) + +data TraceDecisionEvent peer header + = PeersFetch [TraceLabelPeer peer (FetchDecision [Point header])] + | PeerStarvedUs peer + deriving (Show) diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/State.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/State.hs index 6f072fe035d..799e9396197 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch/State.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/State.hs @@ -21,9 +21,12 @@ import Data.Functor.Contravariant (contramap) import Data.Hashable (Hashable) import Data.Map.Strict (Map) import Data.Map.Strict qualified as Map +import Data.Sequence (Seq (Empty)) import Data.Set qualified as Set import Data.Void +import qualified Control.Monad.Class.MonadSTM.Internal as Internal.TVar +import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked (newTVarIO, StrictTVar, readTVarIO, writeTVar) import Control.Exception (assert) import Control.Monad.Class.MonadSTM import Control.Monad.Class.MonadTime.SI @@ -37,32 +40,41 @@ import Ouroboros.Network.Block import Ouroboros.Network.BlockFetch.ClientState (FetchClientStateVars (..), FetchRequest (..), PeerFetchInFlight (..), PeerFetchStatus (..), TraceFetchClientState (..), TraceLabelPeer (..), addNewFetchRequest, - readFetchClientState) + readFetchClientState, PeersOrder (..)) import Ouroboros.Network.BlockFetch.Decision (FetchDecision, FetchDecisionPolicy (..), FetchDecline (..), FetchMode (..), PeerInfo, fetchDecisions) import Ouroboros.Network.BlockFetch.DeltaQ (PeerGSV (..)) - +import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation) +import Ouroboros.Network.BlockFetch.Decision.Trace fetchLogicIterations :: ( HasHeader header , HasHeader block , HeaderHash header ~ HeaderHash block , MonadDelay m - , MonadSTM m + , MonadTimer m , Ord peer , Hashable peer ) - => Tracer m [TraceLabelPeer peer (FetchDecision [Point header])] + => Tracer m (TraceDecisionEvent peer header) -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header)) -> FetchDecisionPolicy header -> FetchTriggerVariables peer header m -> FetchNonTriggerVariables peer header block m + -> (peer -> m ()) -- ^ Action to call to demote the dynamo of ChainSync jumping. -> m Void fetchLogicIterations decisionTracer clientStateTracer fetchDecisionPolicy fetchTriggerVariables - fetchNonTriggerVariables = + fetchNonTriggerVariables + demoteCSJDynamo = do + + peersOrderVar <- newTVarIO $ PeersOrder { + peersOrderCurrent = Nothing, + peersOrderStart = Time 0, + peersOrderAll = Empty + } iterateForever initialFetchStateFingerprint $ \stateFingerprint -> do @@ -71,17 +83,21 @@ fetchLogicIterations decisionTracer clientStateTracer -- + wait for the state to change and make decisions for the new state -- + act on those decisions start <- getMonotonicTime - stateFingerprint' <- fetchLogicIteration + (stateFingerprint', fetchMode) <- fetchLogicIteration decisionTracer clientStateTracer fetchDecisionPolicy fetchTriggerVariables fetchNonTriggerVariables stateFingerprint + (peersOrderVar, demoteCSJDynamo) end <- getMonotonicTime let delta = diffTime end start + loopInterval = case fetchMode of + FetchModeBulkSync -> decisionLoopIntervalBulkSync fetchDecisionPolicy + FetchModeDeadline -> decisionLoopIntervalDeadline fetchDecisionPolicy -- Limit decision is made once every decisionLoopInterval. - threadDelay $ decisionLoopInterval fetchDecisionPolicy - delta - return stateFingerprint' + threadDelay (loopInterval - delta) + pure stateFingerprint' iterateForever :: Monad m => a -> (a -> m a) -> m Void @@ -97,46 +113,61 @@ iterateForever x0 m = go x0 where go x = m x >>= go -- * deciding for each peer if we will initiate a new fetch request -- fetchLogicIteration - :: (Hashable peer, MonadSTM m, Ord peer, + :: (Hashable peer, Ord peer, HasHeader header, HasHeader block, - HeaderHash header ~ HeaderHash block) - => Tracer m [TraceLabelPeer peer (FetchDecision [Point header])] + HeaderHash header ~ HeaderHash block, + MonadTimer m) + => Tracer m (TraceDecisionEvent peer header) -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header)) -> FetchDecisionPolicy header -> FetchTriggerVariables peer header m -> FetchNonTriggerVariables peer header block m -> FetchStateFingerprint peer header block - -> m (FetchStateFingerprint peer header block) + -> (StrictTVar m (PeersOrder peer), peer -> m ()) + -> m (FetchStateFingerprint peer header block, FetchMode) fetchLogicIteration decisionTracer clientStateTracer fetchDecisionPolicy fetchTriggerVariables fetchNonTriggerVariables - stateFingerprint = do + stateFingerprint + (peersOrderVar, demoteCSJDynamo) = do -- Gather a snapshot of all the state we need. - (stateSnapshot, stateFingerprint') <- + -- + -- The grace period is considered to retrigger the decision logic even + -- if no state has changed. This can help downloading blocks from a + -- different peer if all ChainSync clients are blocked on the forecast + -- horizon and the current peer of BlockFetch is not sending blocks. + gracePeriodTVar <- registerDelay (bulkSyncGracePeriod fetchDecisionPolicy) + (stateSnapshot, gracePeriodExpired, stateFingerprint') <- atomically $ readStateVariables fetchTriggerVariables fetchNonTriggerVariables + gracePeriodTVar stateFingerprint + peersOrder <- readTVarIO peersOrderVar -- TODO: allow for boring PeerFetchStatusBusy transitions where we go round -- again rather than re-evaluating everything. - assert (stateFingerprint' /= stateFingerprint) $ return () + assert (gracePeriodExpired || stateFingerprint' /= stateFingerprint) $ return () -- TODO: log the difference in the fingerprint that caused us to wake up -- Make all the fetch decisions - let decisions = fetchDecisionsForStateSnapshot + decisions <- fetchDecisionsForStateSnapshot + decisionTracer fetchDecisionPolicy stateSnapshot + (peersOrder, + atomically . writeTVar peersOrderVar, + demoteCSJDynamo) -- If we want to trace timings, we can do it here after forcing: -- _ <- evaluate (force decisions) -- Trace the batch of fetch decisions - traceWith decisionTracer + traceWith decisionTracer $ PeersFetch [ TraceLabelPeer peer (fmap fetchRequestPoints decision) | (decision, (_, _, _, peer, _)) <- decisions ] @@ -147,7 +178,7 @@ fetchLogicIteration decisionTracer clientStateTracer let !stateFingerprint'' = updateFetchStateFingerprintPeerStatus statusUpdates stateFingerprint' - return stateFingerprint'' + return (stateFingerprint'', fetchStateFetchMode stateSnapshot) where swizzleReqVar (d,(_,_,g,_,(rq,p))) = (d,g,rq,p) @@ -165,14 +196,21 @@ fetchDecisionsForStateSnapshot :: (HasHeader header, HeaderHash header ~ HeaderHash block, Ord peer, - Hashable peer) - => FetchDecisionPolicy header + Hashable peer, + MonadMonotonicTime m) + => Tracer m (TraceDecisionEvent peer header) + -> FetchDecisionPolicy header -> FetchStateSnapshot peer header block m - -> [( FetchDecision (FetchRequest header), + -> ( PeersOrder peer + , PeersOrder peer -> m () + , peer -> m () + ) + -> m [( FetchDecision (FetchRequest header), PeerInfo header peer (FetchClientStateVars m header, peer) )] fetchDecisionsForStateSnapshot + tracer fetchDecisionPolicy FetchStateSnapshot { fetchStateCurrentChain, @@ -181,8 +219,10 @@ fetchDecisionsForStateSnapshot fetchStatePeerGSVs, fetchStateFetchedBlocks, fetchStateFetchedMaxSlotNo, - fetchStateFetchMode - } = + fetchStateFetchMode, + fetchStateChainSelStarvation + } + peersOrderHandlers = assert ( Map.keysSet fetchStatePeerChains `Set.isSubsetOf` Map.keysSet fetchStatePeerStates) $ @@ -190,11 +230,14 @@ fetchDecisionsForStateSnapshot `Set.isSubsetOf` Map.keysSet fetchStatePeerGSVs) $ fetchDecisions + tracer fetchDecisionPolicy fetchStateFetchMode fetchStateCurrentChain fetchStateFetchedBlocks fetchStateFetchedMaxSlotNo + fetchStateChainSelStarvation + peersOrderHandlers peerChainsAndPeerInfo where peerChainsAndPeerInfo = @@ -255,7 +298,8 @@ data FetchNonTriggerVariables peer header block m = FetchNonTriggerVariables { readStatePeerStateVars :: STM m (Map peer (FetchClientStateVars m header)), readStatePeerGSVs :: STM m (Map peer PeerGSV), readStateFetchMode :: STM m FetchMode, - readStateFetchedMaxSlotNo :: STM m MaxSlotNo + readStateFetchedMaxSlotNo :: STM m MaxSlotNo, + readStateChainSelStarvation :: STM m ChainSelStarvation } @@ -298,7 +342,8 @@ data FetchStateSnapshot peer header block m = FetchStateSnapshot { fetchStatePeerGSVs :: Map peer PeerGSV, fetchStateFetchedBlocks :: Point block -> Bool, fetchStateFetchMode :: FetchMode, - fetchStateFetchedMaxSlotNo :: MaxSlotNo + fetchStateFetchedMaxSlotNo :: MaxSlotNo, + fetchStateChainSelStarvation :: ChainSelStarvation } readStateVariables :: (MonadSTM m, Eq peer, @@ -306,17 +351,21 @@ readStateVariables :: (MonadSTM m, Eq peer, HeaderHash header ~ HeaderHash block) => FetchTriggerVariables peer header m -> FetchNonTriggerVariables peer header block m + -> Internal.TVar.TVar m Bool -> FetchStateFingerprint peer header block -> STM m (FetchStateSnapshot peer header block m, + Bool, FetchStateFingerprint peer header block) readStateVariables FetchTriggerVariables{..} FetchNonTriggerVariables{..} + gracePeriodTVar fetchStateFingerprint = do -- Read all the trigger state variables fetchStateCurrentChain <- readStateCurrentChain fetchStatePeerChains <- readStateCandidateChains fetchStatePeerStatus <- readStatePeerStatus + gracePeriodExpired <- Internal.TVar.readTVar gracePeriodTVar -- Construct the change detection fingerprint let !fetchStateFingerprint' = @@ -326,7 +375,7 @@ readStateVariables FetchTriggerVariables{..} fetchStatePeerStatus -- Check the fingerprint changed, or block and wait until it does - check (fetchStateFingerprint' /= fetchStateFingerprint) + check (gracePeriodExpired || fetchStateFingerprint' /= fetchStateFingerprint) -- Now read all the non-trigger state variables fetchStatePeerStates <- readStatePeerStateVars @@ -335,7 +384,7 @@ readStateVariables FetchTriggerVariables{..} fetchStateFetchedBlocks <- readStateFetchedBlocks fetchStateFetchMode <- readStateFetchMode fetchStateFetchedMaxSlotNo <- readStateFetchedMaxSlotNo - + fetchStateChainSelStarvation <- readStateChainSelStarvation -- Construct the overall snapshot of the state let fetchStateSnapshot = @@ -346,7 +395,8 @@ readStateVariables FetchTriggerVariables{..} fetchStatePeerGSVs, fetchStateFetchedBlocks, fetchStateFetchMode, - fetchStateFetchedMaxSlotNo + fetchStateFetchedMaxSlotNo, + fetchStateChainSelStarvation } - return (fetchStateSnapshot, fetchStateFingerprint') + return (fetchStateSnapshot, gracePeriodExpired, fetchStateFingerprint') diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs index c71e30ddddf..17d4dd69727 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs @@ -31,7 +31,7 @@ module Ouroboros.Network.Diffusion.Configuration import System.Random (randomRIO) -import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..)) +import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..), GenesisBlockFetchConfiguration (..)) import Ouroboros.Network.ConnectionManager.Core (defaultProtocolIdleTimeout, defaultResetTimeout, defaultTimeWaitTimeout) import Ouroboros.Network.Diffusion (P2P (..)) @@ -90,12 +90,16 @@ defaultPeerSharing = PeerSharingDisabled -- | Configuration for FetchDecisionPolicy. defaultBlockFetchConfiguration :: Int -> BlockFetchConfiguration defaultBlockFetchConfiguration bfcSalt = - BlockFetchConfiguration { - bfcMaxConcurrencyBulkSync = 1, - bfcMaxConcurrencyDeadline = 1, - bfcMaxRequestsInflight = fromIntegral $ blockFetchPipeliningMax defaultMiniProtocolParameters, - bfcDecisionLoopInterval = 0.01, -- 10ms - bfcSalt } + BlockFetchConfiguration + { bfcMaxConcurrencyDeadline = 1 + , bfcMaxRequestsInflight = fromIntegral $ blockFetchPipeliningMax defaultMiniProtocolParameters + , bfcDecisionLoopIntervalBulkSync = 0.04 -- 40ms + , bfcDecisionLoopIntervalDeadline = 0.01 -- 10ms + , bfcGenesisBFConfig = GenesisBlockFetchConfiguration + { gbfcBulkSyncGracePeriod = 10 -- seconds + } + , bfcSalt + } defaultChainSyncTimeout :: IO ChainSyncTimeout defaultChainSyncTimeout = do