Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ library
Ouroboros.Consensus.Node.Exit
Ouroboros.Consensus.Node.ExitPolicy
Ouroboros.Consensus.Node.GSM
Ouroboros.Consensus.Node.GSM.PeerState
Ouroboros.Consensus.Node.Genesis
Ouroboros.Consensus.Node.Recovery
Ouroboros.Consensus.Node.RethrowPolicy
Expand Down Expand Up @@ -97,8 +98,10 @@ library
random,
resource-registry ^>=0.1,
safe-wild-cards ^>=1.0,
semialign,
serialise ^>=0.2,
text,
these,
time,
transformers,
typed-protocols:{stateful, typed-protocols},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
( ObjectDiffusionInboundStateView
, bracketObjectDiffusionInbound
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
Expand Down Expand Up @@ -214,6 +218,7 @@ data Handlers m addr blk = Handlers
, hPerasCertDiffusionClient ::
NodeToNodeVersion ->
ControlMessageSTM m ->
ObjectDiffusionInboundStateView m ->
ConnectionId addr ->
PerasCertDiffusionInboundPipelined blk m ()
, hPerasCertDiffusionServer ::
Expand Down Expand Up @@ -316,7 +321,7 @@ mkHandlers
(mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool)
(getMempoolWriter getMempool)
version
, hPerasCertDiffusionClient = \version controlMessageSTM peer ->
, hPerasCertDiffusionClient = \version controlMessageSTM state peer ->
objectDiffusionInbound
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers))
( perasCertDiffusionMaxFifoLength miniProtocolParameters
Expand All @@ -326,6 +331,7 @@ mkHandlers
(makePerasCertPoolWriterFromChainDB $ getChainDB)
version
controlMessageSTM
state
, hPerasCertDiffusionServer = \version peer ->
objectDiffusionOutbound
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers))
Expand Down Expand Up @@ -864,17 +870,22 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke
}
channel = do
labelThisThread "PerasCertDiffusionClient"
((), trailing) <-
runPipelinedPeerWithLimits
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
(cPerasCertDiffusionCodec (mkCodecs version))
blPerasCertDiffusion
timeLimitsObjectDiffusion
channel
( objectDiffusionInboundPeerPipelined
(hPerasCertDiffusionClient version controlMessageSTM them)
)
return (NoInitiatorResult, trailing)
bracketObjectDiffusionInbound
version
(getPerasCertDiffusionHandles kernel)
them
$ \state -> do
((), trailing) <-
runPipelinedPeerWithLimits
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
(cPerasCertDiffusionCodec (mkCodecs version))
blPerasCertDiffusion
timeLimitsObjectDiffusion
channel
( objectDiffusionInboundPeerPipelined
(hPerasCertDiffusionClient version controlMessageSTM state them)
)
return (NoInitiatorResult, trailing)

aPerasCertDiffusionServer ::
NodeToNodeVersion ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry
import qualified Ouroboros.Consensus.Ledger.Basics as L
import Ouroboros.Consensus.Node.GsmState
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar)
import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM
import System.FS.API
( HasFS
, createDirectoryIfMissing
Expand Down Expand Up @@ -97,7 +95,7 @@ data CandidateVersusSelection
WhetherCandidateIsBetter !Bool
deriving (Eq, Show)

data GsmView m upstreamPeer selection chainSyncState = GsmView
data GsmView m upstreamPeer selection peerState = GsmView
{ antiThunderingHerd :: Maybe StdGen
-- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up
-- to 15% every transition from Syncing to CaughtUp, in order to avoid a
Expand All @@ -108,13 +106,13 @@ data GsmView m upstreamPeer selection chainSyncState = GsmView
STM
m
( selection ->
chainSyncState ->
peerState ->
CandidateVersusSelection
)
-- ^ Whether the candidate from the @chainSyncState@ is preferable to the
-- selection. This can depend on external state (Peras certificates boosting
-- blocks).
, peerIsIdle :: chainSyncState -> Bool
, peerIsIdle :: peerState -> Bool
, durationUntilTooOld :: Maybe (selection -> m DurationFromNow)
-- ^ How long from now until the selection will be so old that the node
-- should exit the @CaughtUp@ state
Expand All @@ -123,10 +121,8 @@ data GsmView m upstreamPeer selection chainSyncState = GsmView
, equivalent :: selection -> selection -> Bool
-- ^ Whether the two selections are equivalent for the purpose of the
-- Genesis State Machine
, getChainSyncStates ::
STM m (Map.Map upstreamPeer (StrictTVar m chainSyncState))
-- ^ The current ChainSync state with the latest candidates from the
-- upstream peers
, getPeerStates :: STM m (Map.Map upstreamPeer peerState)
-- ^ The current peer state with the latest candidates from the upstream peers
, getCurrentSelection :: STM m selection
-- ^ The node's current selection
, minCaughtUpDuration :: NominalDiffTime
Expand Down Expand Up @@ -244,7 +240,7 @@ realGsmEntryPoints tracerArgs gsmView =
, peerIsIdle
, durationUntilTooOld
, equivalent
, getChainSyncStates
, getPeerStates
, getCurrentSelection
, minCaughtUpDuration
, setCaughtUpPersistentMark
Expand Down Expand Up @@ -370,12 +366,13 @@ realGsmEntryPoints tracerArgs gsmView =

blockUntilCaughtUp :: STM m (TraceGsmEvent tracedSelection)
blockUntilCaughtUp = do
-- STAGE 1: all ChainSync clients report no subsequent headers
varsState <- getChainSyncStates
states <- traverse StrictSTM.readTVar varsState
-- STAGE 1: all peers are idle, which means that
-- * all ChainSync clients report no subsequent headers, and
-- * all PerasCertDiffusion clients report no subsequent certificates
peerStates <- getPeerStates
check $
not (Map.null states)
&& all peerIsIdle states
not (Map.null peerStates)
&& all peerIsIdle peerStates

-- STAGE 2: no candidate is better than the node's current
-- selection
Expand All @@ -388,16 +385,15 @@ realGsmEntryPoints tracerArgs gsmView =
-- block; general Praos reasoning ensures that won't take particularly
-- long.
selection <- getCurrentSelection
candidates <- traverse StrictSTM.readTVar varsState
candidateOverSelection <- getCandidateOverSelection
let ok candidate =
WhetherCandidateIsBetter False
== candidateOverSelection selection candidate
check $ all ok candidates
check $ all ok peerStates

pure $
GsmEventEnterCaughtUp
(Map.size states)
(Map.size peerStates)
(cnvSelection selection)

-- STAGE 3: the previous stages weren't so slow that the idler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
module Ouroboros.Consensus.Node.GSM.PeerState
( GsmPeerState (..)
, maybeChainSyncState
, maybePerasCertDiffusionState
, mkGsmPeerStates
, gsmPeerIsIdle
)
where

import Cardano.Base.FeatureFlags (CardanoFeatureFlag (..))
import Data.Align (Semialign (..))
import Data.Map.Strict (Map)
import Data.Set (Set)
import Data.These (These (..))
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State
( ChainSyncClientHandle (..)
, ChainSyncClientHandleCollection (..)
, ChainSyncState (..)
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
( ObjectDiffusionInboundHandle (..)
, ObjectDiffusionInboundHandleCollection (..)
, ObjectDiffusionInboundState (..)
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert (PerasCertDiffusionInboundState)
import Ouroboros.Consensus.Util.IOLike (MonadSTM (..), readTVar)
import Ouroboros.Network.NodeToNode.Version (isPerasEnabled)

-- | State about peers we are connected to during initialization.
newtype GsmPeerState blk = GsmPeerState
{ unGsmPeerState ::
These
(ChainSyncState blk)
(PerasCertDiffusionInboundState blk)
}

-- | Retrieve the 'ChainSync' state of this peer, if such a connection is established.
maybeChainSyncState :: GsmPeerState blk -> Maybe (ChainSyncState blk)
maybeChainSyncState (GsmPeerState these) =
case these of
This csState -> Just csState
That _ -> Nothing
These csState _ -> Just csState

-- | Retrieve the 'PerasCertDiffusion' state of this peer, if such a connection is established.
maybePerasCertDiffusionState :: GsmPeerState blk -> Maybe (PerasCertDiffusionInboundState blk)
maybePerasCertDiffusionState (GsmPeerState these) =
case these of
This _ -> Nothing
That pcdState -> Just pcdState
These _ pcdState -> Just pcdState

-- | Construct a 'GsmPeerState' for all peers we are connected to.
mkGsmPeerStates ::
(Ord peer, MonadSTM m) =>
ChainSyncClientHandleCollection peer m blk ->
ObjectDiffusionInboundHandleCollection peer m blk ->
STM m (Map peer (GsmPeerState blk))
mkGsmPeerStates csHandles pcdHandles = do
csPeerStates <- traverse (readTVar . cschState) =<< cschcMap csHandles
pcdPeerStates <- traverse (readTVar . odihState) =<< odihcMap pcdHandles
pure (GsmPeerState <$> align csPeerStates pcdPeerStates)

-- | Determine whether our connections to this peer are idle.
gsmPeerIsIdle :: Set CardanoFeatureFlag -> GsmPeerState blk -> Bool
gsmPeerIsIdle featureFlags (GsmPeerState these) =
case these of
-- We have both ChainSync and PerasCertDiffusion connections => idle if both are idling
These csState pcdState -> csIdling csState && odisIdling pcdState
-- Only a ChainSync connection is available => idle if the ChainSync connection is idling
This csState | not (perasIsEnabled csState) -> csIdling csState
-- We will soon establish a PerasCertDiffusion connection => not idling
This _ -> False
-- We will soon establish a ChainSync connection => not idling
That _ -> False
where
-- Is the Peras feature flag enabled and the peer is compatible with it?
perasIsEnabled csState = isPerasEnabled featureFlags (csNodeToNodeVersion csState)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module Ouroboros.Consensus.NodeKernel
, toConsensusMode
) where

import Cardano.Base.FeatureFlags (CardanoFeatureFlag)
import Cardano.Base.FeatureFlags (CardanoFeatureFlag (..))
import Cardano.Network.ConsensusMode (ConsensusMode (..))
import Cardano.Network.PeerSelection.Bootstrap (UseBootstrapPeers)
import Cardano.Network.PeerSelection.LocalRootPeers
Expand All @@ -50,7 +50,7 @@ import Data.Functor ((<&>))
import Data.Hashable (Hashable)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NE
import Data.Maybe (isJust, mapMaybe)
import Data.Maybe (isJust, isNothing, mapMaybe)
import Data.Proxy
import Data.Set (Set)
import qualified Data.Text as Text
Expand Down Expand Up @@ -82,8 +82,16 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
( SomeHeaderInFutureCheck
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
( ObjectDiffusionInboundHandleCollection (..)
, newObjectDiffusionInboundHandleCollection
)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
( PerasCertDiffusionInboundHandleCollection
)
import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..))
import qualified Ouroboros.Consensus.Node.GSM as GSM
import Ouroboros.Consensus.Node.GSM.PeerState (gsmPeerIsIdle, maybeChainSyncState, mkGsmPeerStates)
import Ouroboros.Consensus.Node.Genesis
( GenesisNodeKernelArgs (..)
, LoEAndGDDConfig (..)
Expand Down Expand Up @@ -175,6 +183,9 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel
-- from it with 'GSM.gsmStateToLedgerJudgement'.
, getChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
-- ^ The kill handle and exposed state for each ChainSync client.
, getPerasCertDiffusionHandles ::
ObjectDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk
-- ^ The exposed state for each Peras CertDiffusion client.
, getPeerSharingRegistry :: PeerSharingRegistry addrNTN m
-- ^ Read the current peer sharing registry, used for interacting with
-- the PeerSharing protocol
Expand Down Expand Up @@ -235,6 +246,7 @@ initNodeKernel
args@NodeKernelArgs
{ registry
, cfg
, featureFlags
, tracers
, chainDB
, initChainDB
Expand All @@ -257,6 +269,7 @@ initNodeKernel
, mempool
, peerSharingRegistry
, varChainSyncHandles
, varPerasCertDiffusionHandles
, varGsmState
} = st

Expand All @@ -275,24 +288,34 @@ initNodeKernel
GSM.GsmView
{ GSM.antiThunderingHerd = Just gsmAntiThunderingHerd
, GSM.getCandidateOverSelection = do
weights <- ChainDB.getPerasWeightSnapshot chainDB
pure $ \(headers, _lst) state ->
case AF.intersectionPoint headers (csCandidate state) of
Nothing -> GSM.CandidateDoesNotIntersect
Just{} ->
GSM.WhetherCandidateIsBetter $ -- precondition requires intersection
preferAnchoredCandidate
(configBlock cfg)
(forgetFingerprint weights)
headers
(csCandidate state)
, GSM.peerIsIdle = csIdling
weights <- forgetFingerprint <$> ChainDB.getPerasWeightSnapshot chainDB
pure $ \(headers, _lst) peerState -> do
case csCandidate <$> maybeChainSyncState peerState of
Just candidate
-- The candidate does not intersect with our current chain.
-- This is a precondition for 'WhetherCandidateIsBetter'.
| isNothing (AF.intersectionPoint headers candidate) ->
GSM.CandidateDoesNotIntersect
-- The candidate is better than our current chain.
| preferAnchoredCandidate (configBlock cfg) weights headers candidate ->
GSM.WhetherCandidateIsBetter True
-- The candidate is not better than our current chain.
| otherwise ->
GSM.WhetherCandidateIsBetter False
Nothing ->
-- We don't have an established ChainSync connection with this peer.
-- We conservatively assume that its candidate is not better than ours.
GSM.WhetherCandidateIsBetter False
, GSM.peerIsIdle = gsmPeerIsIdle featureFlags
, GSM.durationUntilTooOld =
gsmDurationUntilTooOld
<&> \wd (_headers, lst) ->
GSM.getDurationUntilTooOld wd (getTipSlot lst)
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
, GSM.getChainSyncStates = fmap cschState <$> cschcMap varChainSyncHandles
, GSM.getPeerStates =
mkGsmPeerStates
varChainSyncHandles
varPerasCertDiffusionHandles
, GSM.getCurrentSelection = do
headers <- ChainDB.getCurrentChainWithTime chainDB
extLedgerState <- ChainDB.getCurrentLedger chainDB
Expand Down Expand Up @@ -369,6 +392,7 @@ initNodeKernel
, getFetchMode = readFetchMode blockFetchInterface
, getGsmState = readTVar varGsmState
, getChainSyncHandles = varChainSyncHandles
, getPerasCertDiffusionHandles = varPerasCertDiffusionHandles
, getPeerSharingRegistry = peerSharingRegistry
, getTracers = tracers
, setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a
Expand Down Expand Up @@ -419,6 +443,8 @@ data InternalState m addrNTN addrNTC blk = IS
BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (HeaderWithTime blk) blk m
, varChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
, varPerasCertDiffusionHandles ::
PerasCertDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk
, varGsmState :: StrictTVar m GSM.GsmState
, mempool :: Mempool m blk
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
Expand Down Expand Up @@ -457,6 +483,8 @@ initInternalState
newTVarIO gsmState

varChainSyncHandles <- atomically newChainSyncClientHandleCollection
varPerasCertDiffusionHandles <- atomically newObjectDiffusionInboundHandleCollection

mempool <-
openMempool
registry
Expand Down
Loading