1- {-# LANGUAGE MultiWayIf #-}
2- {-# LANGUAGE OverloadedStrings #-}
3- {-# LANGUAGE ScopedTypeVariables #-}
4- {-# LANGUAGE TemplateHaskell #-}
5- {-# LANGUAGE TypeApplications #-}
1+ {-# LANGUAGE DataKinds #-}
2+ {-# LANGUAGE DisambiguateRecordFields #-}
3+ {-# LANGUAGE MultiWayIf #-}
4+ {-# LANGUAGE OverloadedRecordDot #-}
5+ {-# LANGUAGE OverloadedStrings #-}
6+ {-# LANGUAGE ScopedTypeVariables #-}
7+ {-# LANGUAGE TemplateHaskell #-}
8+ {-# LANGUAGE TypeApplications #-}
9+ {-# LANGUAGE TypeOperators #-}
610
711module Main where
812
13+ import Control.Concurrent.Class.MonadSTM.Strict
914import Control.Monad (void , when )
1015import Control.Tracer (Tracer (.. ), nullTracer , traceWith )
1116
1217import Data.Act
1318import Data.Aeson (ToJSON )
19+ import Data.ByteString.Lazy qualified as BSL
1420import Data.Functor.Contravariant ((>$<) )
21+ import Data.List.NonEmpty (NonEmpty )
1522import Data.Maybe (maybeToList )
1623import Data.Text qualified as Text
1724import Data.Text.IO qualified as Text
@@ -23,13 +30,15 @@ import System.Random (newStdGen, split)
2330
2431import Cardano.Git.Rev (gitRev )
2532import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto )
33+ import Cardano.Ledger.Keys (VKey (.. ))
34+ import Cardano.Ledger.Hashes (hashKey )
2635
2736import DMQ.Configuration
2837import DMQ.Configuration.CLIOptions (parseCLIOptions )
2938import DMQ.Configuration.Topology (readTopologyFileOrError )
3039import DMQ.Diffusion.Applications (diffusionApplications )
3140import DMQ.Diffusion.Arguments
32- import DMQ.Diffusion.NodeKernel ( mempool , withNodeKernel )
41+ import DMQ.Diffusion.NodeKernel
3342import DMQ.Handlers.TopLevel (toplevelExceptionHandler )
3443import DMQ.NodeToClient qualified as NtC
3544import DMQ.NodeToNode (NodeToNodeVersion , dmqCodecs , dmqLimitsAndTimeouts ,
@@ -39,9 +48,14 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..))
3948import DMQ.Tracer
4049
4150import DMQ.Diffusion.PeerSelection (policy )
51+ import DMQ.NodeToClient.LocalStateQueryClient
52+ import DMQ.Protocol.SigSubmission.Validate
4253import Ouroboros.Network.Diffusion qualified as Diffusion
54+ import Ouroboros.Network.PeerSelection.LedgerPeers.Type
4355import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress ,
4456 encodeRemoteAddress )
57+ import Ouroboros.Network.SizeInBytes
58+ import Ouroboros.Network.Snocket
4559import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
4660
4761import Paths_dmq_node qualified as Meta
@@ -70,6 +84,7 @@ runDMQ commandLineConfig = do
7084 dmqcTopologyFile = I topologyFile,
7185 dmqcHandshakeTracer = I handshakeTracer,
7286 dmqcLocalHandshakeTracer = I localHandshakeTracer,
87+ dmqcCardanoNodeSocket = I snocketPath,
7388 dmqcVersion = I version
7489 } = config' <> commandLineConfig
7590 `act`
@@ -101,49 +116,75 @@ runDMQ commandLineConfig = do
101116 stdGen <- newStdGen
102117 let (psRng, policyRng) = split stdGen
103118
104- withNodeKernel @ StandardCrypto
105- tracer
106- dmqConfig
107- psRng $ \ nodeKernel -> do
108- dmqDiffusionConfiguration <- mkDiffusionConfiguration dmqConfig nt
109-
110- let dmqNtNApps =
111- ntnApps tracer
112- dmqConfig
113- nodeKernel
114- (dmqCodecs
115- (encodeRemoteAddress (maxBound @ NodeToNodeVersion ))
116- (decodeRemoteAddress (maxBound @ NodeToNodeVersion )))
117- dmqLimitsAndTimeouts
118- defaultSigDecisionPolicy
119- dmqNtCApps =
120- let sigSize _ = 0 -- TODO
121- maxMsgs = 1000 -- TODO: make this dynamic?
122- mempoolReader = Mempool. getReader sigId sigSize (mempool nodeKernel)
123- mempoolWriter = Mempool. getWriter sigId (pure () )
124- (\ _ _ -> Right () :: Either Void () )
125- (\ _ -> True )
126- (mempool nodeKernel)
127- in NtC. ntcApps tracer dmqConfig
128- mempoolReader mempoolWriter maxMsgs
129- (NtC. dmqCodecs encodeReject decodeReject)
130- dmqDiffusionArguments =
131- diffusionArguments (if handshakeTracer
132- then WithEventType " Handshake" >$< tracer
133- else nullTracer)
134- (if localHandshakeTracer
135- then WithEventType " Handshake" >$< tracer
136- else nullTracer)
137- dmqDiffusionApplications =
138- diffusionApplications nodeKernel
139- dmqConfig
140- dmqDiffusionConfiguration
141- dmqLimitsAndTimeouts
142- dmqNtNApps
143- dmqNtCApps
144- (policy policyRng)
145-
146- Diffusion. run dmqDiffusionArguments
147- (dmqDiffusionTracers dmqConfig tracer)
148- dmqDiffusionConfiguration
149- dmqDiffusionApplications
119+ -- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port.
120+ Diffusion. withIOManager \ iocp -> do
121+ let localSnocket' = localSnocket iocp
122+ mkStakePoolMonitor = connectToCardanoNode tracer localSnocket' snocketPath
123+
124+ withNodeKernel @ StandardCrypto
125+ tracer
126+ dmqConfig
127+ psRng
128+ mkStakePoolMonitor $ \ nodeKernel -> do
129+ dmqDiffusionConfiguration <-
130+ mkDiffusionConfiguration dmqConfig nt nodeKernel. stakePools. ledgerBigPeersVar
131+
132+ let sigSize :: Sig StandardCrypto -> SizeInBytes
133+ sigSize = fromIntegral . BSL. length . sigRawBytes
134+ mempoolReader = Mempool. getReader sigId sigSize (mempool nodeKernel)
135+ dmqNtNApps =
136+ let ntnMempoolWriter = Mempool. writerAdapter $
137+ Mempool. getWriter sigId
138+ (poolValidationCtx $ stakePools nodeKernel)
139+ (validateSig (hashKey . VKey ))
140+ SigDuplicate
141+ (mempool nodeKernel)
142+ in ntnApps tracer
143+ dmqConfig
144+ mempoolReader
145+ ntnMempoolWriter
146+ sigSize
147+ nodeKernel
148+ (dmqCodecs
149+ -- TODO: `maxBound :: Cardano.Network.NodeToNode.NodeToNodeVersion`
150+ -- is unsafe here!
151+ (encodeRemoteAddress (maxBound @ NodeToNodeVersion ))
152+ (decodeRemoteAddress (maxBound @ NodeToNodeVersion )))
153+ dmqLimitsAndTimeouts
154+ defaultSigDecisionPolicy
155+ dmqNtCApps =
156+ let ntcMempoolWriter =
157+ Mempool. getWriter sigId
158+ (poolValidationCtx $ stakePools nodeKernel)
159+ (validateSig (hashKey . VKey ))
160+ SigDuplicate
161+ (mempool nodeKernel)
162+ in NtC. ntcApps tracer dmqConfig
163+ mempoolReader ntcMempoolWriter
164+ (NtC. dmqCodecs encodeReject decodeReject)
165+ dmqDiffusionArguments =
166+ diffusionArguments (if handshakeTracer
167+ then WithEventType " Handshake" >$< tracer
168+ else nullTracer)
169+ (if localHandshakeTracer
170+ then WithEventType " Handshake" >$< tracer
171+ else nullTracer)
172+ $ maybe [] out <$> tryReadTMVar nodeKernel. stakePools. ledgerPeersVar
173+ where
174+ out :: LedgerPeerSnapshot AllLedgerPeers
175+ -> [(PoolStake , NonEmpty LedgerRelayAccessPoint )]
176+ out (LedgerAllPeerSnapshotV23 _pt _magic relays) = relays
177+
178+ dmqDiffusionApplications =
179+ diffusionApplications nodeKernel
180+ dmqConfig
181+ dmqDiffusionConfiguration
182+ dmqLimitsAndTimeouts
183+ dmqNtNApps
184+ dmqNtCApps
185+ (policy policyRng)
186+
187+ Diffusion. run dmqDiffusionArguments
188+ (dmqDiffusionTracers dmqConfig tracer)
189+ dmqDiffusionConfiguration
190+ dmqDiffusionApplications
0 commit comments