Skip to content

Commit 0a6b93a

Browse files
Expose big ledger peers to dmq node and add ocert counter
1 parent 736a524 commit 0a6b93a

File tree

7 files changed

+107
-88
lines changed

7 files changed

+107
-88
lines changed

dmq-node/app/Main.hs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
{-# LANGUAGE DataKinds #-}
12
{-# LANGUAGE DisambiguateRecordFields #-}
23
{-# LANGUAGE MultiWayIf #-}
4+
{-# LANGUAGE OverloadedRecordDot #-}
35
{-# LANGUAGE OverloadedStrings #-}
46
{-# LANGUAGE ScopedTypeVariables #-}
57
{-# LANGUAGE TemplateHaskell #-}
@@ -8,13 +10,14 @@
810

911
module Main where
1012

11-
import Control.Exception (throwIO)
13+
import Control.Concurrent.Class.MonadSTM.Strict
1214
import Control.Monad (void, when)
1315
import Control.Tracer (Tracer (..), nullTracer, traceWith)
1416

1517
import Data.Act
1618
import Data.Aeson (ToJSON)
1719
import Data.Functor.Contravariant ((>$<))
20+
import Data.List.NonEmpty (NonEmpty)
1821
import Data.Maybe (maybeToList)
1922
import Data.Text qualified as Text
2023
import Data.Text.IO qualified as Text
@@ -47,6 +50,7 @@ import DMQ.Diffusion.PeerSelection (policy)
4750
import DMQ.NodeToClient.LocalStateQueryClient
4851
import DMQ.Protocol.SigSubmission.Validate
4952
import Ouroboros.Network.Diffusion qualified as Diffusion
53+
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
5054
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
5155
encodeRemoteAddress)
5256
import Ouroboros.Network.SizeInBytes
@@ -121,7 +125,8 @@ runDMQ commandLineConfig = do
121125
dmqConfig
122126
psRng
123127
mkStakePoolMonitor $ \nodeKernel -> do
124-
dmqDiffusionConfiguration <- mkDiffusionConfiguration dmqConfig nt
128+
dmqDiffusionConfiguration <-
129+
mkDiffusionConfiguration dmqConfig nt (nodeKernel.stakePools.ledgerBigPeersVar)
125130

126131
let sigSize :: Sig StandardCrypto -> SizeInBytes
127132
sigSize _ = 0 -- TODO
@@ -163,6 +168,12 @@ runDMQ commandLineConfig = do
163168
(if localHandshakeTracer
164169
then WithEventType "Handshake" >$< tracer
165170
else nullTracer)
171+
$ maybe [] out <$> (tryReadTMVar $ nodeKernel.stakePools.ledgerPeersVar)
172+
where
173+
out :: LedgerPeerSnapshot AllLedgerPeers
174+
-> [(PoolStake, NonEmpty LedgerRelayAccessPoint)]
175+
out (LedgerAllPeerSnapshotV23 _pt _magic relays) = relays
176+
166177
dmqDiffusionApplications =
167178
diffusionApplications nodeKernel
168179
dmqConfig

dmq-node/dmq-node.cabal

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,7 @@ library
105105
containers >=0.5 && <0.8,
106106
contra-tracer >=0.1 && <0.3,
107107
deepseq >=1.0 && <1.6,
108-
directory,
109108
dns >=1.0 && <4.3,
110-
filepath,
111109
generic-data,
112110
hashable >=1.0 && <1.6,
113111
io-classes:{io-classes, si-timers, strict-mvar, strict-stm} ^>=1.8.0.1,
@@ -151,6 +149,7 @@ executable dmq-node
151149
cardano-ledger-core,
152150
contra-tracer >=0.1 && <0.3,
153151
dmq-node,
152+
io-classes:strict-stm,
154153
kes-agent-crypto,
155154
optparse-applicative,
156155
ouroboros-network:{ouroboros-network, api, framework},

dmq-node/src/DMQ/Configuration.hs

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE DataKinds #-}
12
{-# LANGUAGE DeriveGeneric #-}
23
{-# LANGUAGE DerivingVia #-}
34
{-# LANGUAGE FlexibleContexts #-}
@@ -31,7 +32,7 @@ module DMQ.Configuration
3132
, LocalAddress (..)
3233
) where
3334

34-
import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
35+
import Control.Concurrent.Class.MonadSTM.Strict
3536
import Control.Monad.Class.MonadThrow
3637
import Control.Monad.Class.MonadTime.SI (DiffTime)
3738
import Data.Act
@@ -48,11 +49,8 @@ import Data.Text (Text)
4849
import Data.Text qualified as Text
4950
import Generic.Data (gmappend, gmempty)
5051
import GHC.Generics (Generic)
51-
import GHC.Stack (HasCallStack)
5252
import Network.Socket (AddrInfo (..), AddrInfoFlag (..), PortNumber,
5353
SocketType (..), defaultHints, getAddrInfo)
54-
import System.Directory qualified as Directory
55-
import System.FilePath qualified as FilePath
5654
import System.IO.Error (isDoesNotExistError)
5755
import Text.Read (readMaybe)
5856

@@ -69,14 +67,13 @@ import Ouroboros.Network.OrphanInstances ()
6967
import Ouroboros.Network.PeerSelection.Governor.Types
7068
(PeerSelectionTargets (..), makePublicPeerSelectionStateVar)
7169
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
72-
(LedgerPeerSnapshot (..))
70+
(LedgerPeerSnapshot (..), LedgerPeersKind (..))
7371
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
7472
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
7573
import Ouroboros.Network.Snocket (LocalAddress (..), RemoteAddress)
7674
import Ouroboros.Network.TxSubmission.Inbound.V2 (TxDecisionPolicy (..))
7775

78-
import DMQ.Configuration.Topology (NoExtraConfig (..), NoExtraFlags (..),
79-
readPeerSnapshotFileOrError)
76+
import DMQ.Configuration.Topology (NoExtraConfig (..), NoExtraFlags (..))
8077

8178
-- | Configuration comes in two flavours paramemtrised by `f` functor:
8279
-- `PartialConfig` is using `Last` and `Configuration` is using an identity
@@ -471,16 +468,15 @@ readConfigurationFileOrError nc =
471468
pure
472469

473470
mkDiffusionConfiguration
474-
:: HasCallStack
475-
=> Configuration
471+
:: Configuration
476472
-> NetworkTopology NoExtraConfig NoExtraFlags
473+
-> StrictTVar IO (Maybe (LedgerPeerSnapshot BigLedgerPeers))
477474
-> IO (Diffusion.Configuration NoExtraFlags IO ntnFd RemoteAddress ntcFd LocalAddress)
478475
mkDiffusionConfiguration
479476
Configuration {
480477
dmqcIPv4 = I ipv4
481478
, dmqcIPv6 = I ipv6
482479
, dmqcLocalAddress = I localAddress
483-
, dmqcTopologyFile = I topologyFile
484480
, dmqcPortNumber = I portNumber
485481
, dmqcDiffusionMode = I diffusionMode
486482
, dmqcAcceptedConnectionsLimit = I acceptedConnectionsLimit
@@ -497,8 +493,8 @@ mkDiffusionConfiguration
497493
}
498494
nt@NetworkTopology {
499495
useLedgerPeers
500-
, peerSnapshotPath
501-
} = do
496+
}
497+
ledgerBigPeersVar = do
502498
case (ipv4, ipv6) of
503499
(Nothing, Nothing) ->
504500
throwIO NoAddressInformation
@@ -526,12 +522,6 @@ mkDiffusionConfiguration
526522
localRootsVar <- newTVarIO localRoots
527523
publicRootsVar <- newTVarIO publicRoots
528524
useLedgerVar <- newTVarIO useLedgerPeers
529-
ledgerPeerSnapshotPathVar <- newTVarIO peerSnapshotPath
530-
topologyDir <- FilePath.takeDirectory <$> Directory.makeAbsolute topologyFile
531-
ledgerPeerSnapshotVar <- newTVarIO =<< updateLedgerPeerSnapshot
532-
topologyDir
533-
(readTVar ledgerPeerSnapshotPathVar)
534-
(const . pure $ ())
535525

536526
return $
537527
Diffusion.Configuration {
@@ -553,7 +543,7 @@ mkDiffusionConfiguration
553543
}
554544
, Diffusion.dcReadLocalRootPeers = readTVar localRootsVar
555545
, Diffusion.dcReadPublicRootPeers = readTVar publicRootsVar
556-
, Diffusion.dcReadLedgerPeerSnapshot = readTVar ledgerPeerSnapshotVar
546+
, Diffusion.dcReadLedgerPeerSnapshot = readTVar ledgerBigPeersVar
557547
, Diffusion.dcPeerSharing = peerSharing
558548
, Diffusion.dcReadUseLedgerPeers = readTVar useLedgerVar
559549
, Diffusion.dcProtocolIdleTimeout = protocolIdleTimeout
@@ -570,23 +560,6 @@ mkDiffusionConfiguration
570560
, addrSocketType = Stream
571561
}
572562

573-
updateLedgerPeerSnapshot :: HasCallStack
574-
=> FilePath
575-
-> STM IO (Maybe FilePath)
576-
-> (Maybe LedgerPeerSnapshot -> STM IO ())
577-
-> IO (Maybe LedgerPeerSnapshot)
578-
updateLedgerPeerSnapshot topologyDir readLedgerPeerPath writeVar = do
579-
mPeerSnapshotFile <- atomically readLedgerPeerPath
580-
mLedgerPeerSnapshot <- case mPeerSnapshotFile of
581-
Nothing -> pure Nothing
582-
Just peerSnapshotFile | FilePath.isRelative peerSnapshotFile -> do
583-
peerSnapshotFile' <- Directory.makeAbsolute $ topologyDir FilePath.</> peerSnapshotFile
584-
Just <$> readPeerSnapshotFileOrError peerSnapshotFile'
585-
Just peerSnapshotFile ->
586-
Just <$> readPeerSnapshotFileOrError peerSnapshotFile
587-
atomically . writeVar $ mLedgerPeerSnapshot
588-
pure mLedgerPeerSnapshot
589-
590563

591564
-- TODO: review this once we know what is the size of a `Sig`.
592565
-- TODO: parts of should be configurable
@@ -608,5 +581,3 @@ data ConfigurationError =
608581

609582
instance Exception ConfigurationError where
610583
displayException NoAddressInformation = "no ipv4 or ipv6 address specified, use --host-addr or --host-ipv6-addr"
611-
612-

dmq-node/src/DMQ/Configuration/Topology.hs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE DataKinds #-}
12
{-# LANGUAGE DeriveGeneric #-}
23
{-# LANGUAGE FlexibleInstances #-}
34
{-# LANGUAGE NamedFieldPuns #-}
@@ -16,7 +17,6 @@ import Data.Text qualified as Text
1617
import Ouroboros.Network.Diffusion.Topology (NetworkTopology (..))
1718
import Ouroboros.Network.OrphanInstances (localRootPeersGroupsFromJSON,
1819
networkTopologyFromJSON, networkTopologyToJSON)
19-
import Ouroboros.Network.PeerSelection.LedgerPeers.Type (LedgerPeerSnapshot)
2020
import System.Exit (die)
2121

2222
data NoExtraConfig = NoExtraConfig
@@ -68,29 +68,3 @@ readTopologyFileOrError nc =
6868
readTopologyFile nc
6969
>>= either (die . Text.unpack)
7070
pure
71-
72-
readPeerSnapshotFile :: FilePath -> IO (Either Text LedgerPeerSnapshot)
73-
readPeerSnapshotFile psf = do
74-
eBs <- try $ BS.readFile psf
75-
case eBs of
76-
Left e -> return . Left $ handler e
77-
Right bs ->
78-
let bs' = LBS.fromStrict bs in
79-
case eitherDecode bs' of
80-
Left err -> return $ Left (handlerJSON err)
81-
Right t -> return $ Right t
82-
where
83-
handler :: IOException -> Text
84-
handler e = Text.pack $ "DMQ.Topology.readLedgerPeerSnapshotFile: "
85-
++ displayException e
86-
handlerJSON :: String -> Text
87-
handlerJSON err = Text.unlines
88-
[ "snapshot file parging error:"
89-
, Text.pack err
90-
]
91-
92-
readPeerSnapshotFileOrError :: FilePath -> IO LedgerPeerSnapshot
93-
readPeerSnapshotFileOrError psf =
94-
readPeerSnapshotFile psf
95-
>>= either (die . Text.unpack)
96-
pure

dmq-node/src/DMQ/Diffusion/Arguments.hs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import Control.Monad.Class.MonadST (MonadST)
2222
import Control.Monad.Class.MonadThrow (MonadCatch)
2323
import Control.Monad.Class.MonadTimer.SI (MonadDelay, MonadTimer)
2424
import Control.Tracer (Tracer)
25+
import Data.List.NonEmpty (NonEmpty)
2526
import Network.DNS (Resolver)
2627
import Network.Socket (Socket)
2728

@@ -35,7 +36,7 @@ import Ouroboros.Network.PeerSelection.Churn (peerChurnGovernor)
3536
import Ouroboros.Network.PeerSelection.Governor.Types
3637
(ExtraGuardedDecisions (..), PeerSelectionGovernorArgs (..))
3738
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
38-
(LedgerPeersConsensusInterface (..))
39+
(LedgerPeersConsensusInterface (..), PoolStake, LedgerRelayAccessPoint)
3940
import Ouroboros.Network.PeerSelection.RelayAccessPoint (SRVPrefix)
4041
import Ouroboros.Network.PeerSelection.Types (nullPublicExtraPeersAPI)
4142

@@ -49,6 +50,7 @@ diffusionArguments
4950
)
5051
=> Tracer m (NtN.HandshakeTr ntnAddr)
5152
-> Tracer m (NtC.HandshakeTr ntcAddr)
53+
-> STM m [(PoolStake, NonEmpty LedgerRelayAccessPoint)]
5254
-> Diffusion.Arguments
5355
NoExtraState NoExtraDebugState NoExtraFlags NoExtraPeers
5456
NoExtraAPI NoExtraChurnArgs NoExtraCounters NoExtraTracer
@@ -63,7 +65,8 @@ diffusionArguments
6365
NodeToClientVersion
6466
NodeToClientVersionData
6567
diffusionArguments handshakeNtNTracer
66-
handshakeNtCTracer =
68+
handshakeNtCTracer
69+
lpGetLedgerPeers =
6770
Diffusion.Arguments {
6871
Diffusion.daNtnDataFlow = DMQ.ntnDataFlow
6972
, Diffusion.daNtnPeerSharing = peerSharing
@@ -74,7 +77,7 @@ diffusionArguments handshakeNtNTracer
7477
, Diffusion.daLedgerPeersCtx =
7578
LedgerPeersConsensusInterface {
7679
lpGetLatestSlot = return minBound
77-
, lpGetLedgerPeers = return []
80+
, lpGetLedgerPeers
7881
, lpExtraAPI = NoExtraAPI
7982
}
8083
, Diffusion.daEmptyExtraState = NoExtraState

dmq-node/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import Data.Set qualified as Set
2929
import Data.Time.Clock.POSIX (POSIXTime)
3030
import Data.Time.Clock.POSIX qualified as Time
3131
import Data.Void (Void)
32+
import Data.Word
3233
import System.Random (StdGen)
3334
import System.Random qualified as Random
3435

@@ -41,6 +42,8 @@ import Ouroboros.Network.BlockFetch (FetchClientRegistry,
4142
import Ouroboros.Network.ConnectionId (ConnectionId (..))
4243
import Ouroboros.Network.PeerSelection.Governor.Types
4344
(makePublicPeerSelectionStateVar)
45+
import Ouroboros.Network.PeerSelection.LedgerPeers.Type (LedgerPeerSnapshot,
46+
LedgerPeersKind (..))
4447
import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry,
4548
newPeerSharingAPI, newPeerSharingRegistry,
4649
ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME)
@@ -64,7 +67,6 @@ data NodeKernel crypto ntnAddr m =
6467
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
6568
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
6669
, mempool :: !(Mempool m SigId (Sig crypto))
67-
, evolutionConfig :: !(KES.EvolutionConfig)
6870
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId (Sig crypto))
6971
, sigMempoolSem :: !(TxMempoolSem m)
7072
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto))
@@ -79,15 +81,27 @@ type PoolId = KeyHash StakePool
7981
data StakePools m = StakePools {
8082
-- | contains map of cardano pool stake snapshot obtained
8183
-- via local state query client
82-
stakePoolsVar :: StrictTVar m (Map PoolId StakeSnapshot)
84+
stakePoolsVar :: !(StrictTVar m (Map PoolId StakeSnapshot))
8385
-- | acquires validation context for signature validation
84-
, poolValidationCtx :: m PoolValidationCtx
86+
, poolValidationCtx :: !(m (PoolValidationCtx m))
87+
-- | provides only those big peers which provide SRV endpoints
88+
-- as otherwise those are cardano-nodes
89+
, ledgerBigPeersVar
90+
:: !(StrictTVar m (Maybe (LedgerPeerSnapshot BigLedgerPeers)))
91+
-- | all ledger peers, restricted to srv endpoints
92+
, ledgerPeersVar
93+
:: !(StrictTMVar m (LedgerPeerSnapshot AllLedgerPeers))
8594
}
8695

87-
data PoolValidationCtx =
88-
DMQPoolValidationCtx !UTCTime -- ^ time of context acquisition
89-
!(Maybe UTCTime) -- ^ UTC time of next epoch boundary
90-
!(Map PoolId StakeSnapshot) -- ^ for signature validation
96+
data PoolValidationCtx m =
97+
DMQPoolValidationCtx !UTCTime
98+
-- ^ time of context acquisition
99+
!(Maybe UTCTime)
100+
-- ^ UTC time of next epoch boundary for handling clock skey
101+
!(Map PoolId StakeSnapshot)
102+
-- ^ for signature validation
103+
!(StrictTVar m (Map PoolId Word64))
104+
-- ^ ocert counter to validate only monotonically increasing values
91105

92106
newNodeKernel :: ( MonadLabelledSTM m
93107
, MonadMVar m
@@ -107,15 +121,20 @@ newNodeKernel rng = do
107121
sigMempoolSem <- newTxMempoolSem
108122
let (rng', rng'') = Random.split rng
109123
sigSharedTxStateVar <- newSharedTxStateVar rng'
110-
nextEpochVar <- newTVarIO Nothing
111-
stakePoolsVar <- newTVarIO Map.empty
124+
(nextEpochVar, ocertCountersVar, stakePoolsVar, ledgerBigPeersVar, ledgerPeersVar) <- atomically $
125+
(,,,,) <$> newTVar Nothing
126+
<*> newTVar Map.empty
127+
<*> newTVar Map.empty
128+
<*> newTVar Nothing
129+
<*> newEmptyTMVar
112130
let poolValidationCtx = do
113131
(nextEpochBoundary, stakePools') <-
114-
atomically $ (,) <$> readTVar nextEpochVar <*> readTVar stakePoolsVar
132+
atomically $
133+
(,) <$> readTVar nextEpochVar <*> readTVar stakePoolsVar
115134
now <- getCurrentTime
116-
return $ DMQPoolValidationCtx now nextEpochBoundary stakePools'
135+
return $ DMQPoolValidationCtx now nextEpochBoundary stakePools' ocertCountersVar
117136

118-
stakePools = StakePools { stakePoolsVar, poolValidationCtx }
137+
stakePools = StakePools { stakePoolsVar, poolValidationCtx, ledgerBigPeersVar, ledgerPeersVar }
119138

120139
peerSharingAPI <-
121140
newPeerSharingAPI

0 commit comments

Comments
 (0)