33{-# LANGUAGE DataKinds #-}
44{-# LANGUAGE FlexibleInstances #-}
55{-# LANGUAGE PackageImports #-}
6+ {-# LANGUAGE RecordWildCards #-}
67{-# LANGUAGE ScopedTypeVariables #-}
78{-# LANGUAGE TypeApplications #-}
89{-# LANGUAGE ViewPatterns #-}
910
11+ {-# OPTIONS_GHC -Wno-partial-fields #-}
12+
1013module Trace.Forward.Forwarding
11- ( initForwarding
14+ ( InitForwardingConfig (.. )
15+ , initForwarding
1216 , initForwardingDelayed
1317 ) where
1418
@@ -26,22 +30,21 @@ import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionData
2630import Ouroboros.Network.Protocol.Handshake.Type (Handshake )
2731import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion , queryVersion ,
2832 simpleSingletonVersions )
29- import Ouroboros.Network.Snocket (MakeBearer , Snocket , localAddressFromPath , localSnocket ,
30- makeLocalBearer , LocalAddress , socketSnocket , makeSocketBearer , LocalSocket )
31- import Ouroboros.Network.Socket (ConnectToArgs (.. ),
32- HandshakeCallbacks (.. ), SomeResponderApplication (.. ),
33- connectToNode , nullNetworkConnectTracers )
3433import qualified Ouroboros.Network.Server.Simple as Server
34+ import Ouroboros.Network.Snocket (LocalAddress , LocalSocket , MakeBearer , Snocket ,
35+ localAddressFromPath , localSnocket , makeLocalBearer , makeSocketBearer ,
36+ socketSnocket )
37+ import Ouroboros.Network.Socket (ConnectToArgs (.. ), HandshakeCallbacks (.. ),
38+ SomeResponderApplication (.. ), connectToNode , nullNetworkConnectTracers )
3539
3640import Codec.CBOR.Term (Term )
3741import Control.Concurrent.Async (async , wait )
38- import Control.Exception (throwIO )
42+ import Control.Exception (SomeException , throwIO )
3943import Control.Monad.IO.Class
4044import "contra-tracer" Control.Tracer (Tracer , contramap , nullTracer , stdoutTracer )
4145import qualified Data.ByteString.Lazy as LBS
4246import Data.Functor
4347import Data.List.NonEmpty (NonEmpty ((:|) ))
44- import Data.Maybe (isNothing )
4548import qualified Data.Text as Text
4649import Data.Void (Void , absurd )
4750import Data.Word (Word16 )
@@ -61,15 +64,37 @@ import Trace.Forward.Utils.ForwardSink (ForwardSink)
6164import Trace.Forward.Utils.TraceObject
6265import Trace.Forward.Utils.Version
6366
67+
68+ -- | Config record to initialise trace forwarding
69+ data InitForwardingConfig
70+ = -- | Only construct relevant values, but do not actually run forwarding
71+ InitForwardingNone
72+ | -- | Run forwarding with the provided settings
73+ InitForwardingWith
74+ { initNetworkMagic :: ! NetworkMagic
75+ -- ^ Forwarding is always tied to a singular networkId
76+ , initEKGStore :: ! (Maybe EKG. Store )
77+ -- ^ A metrics store to be forwarded (optional)
78+ , initHowToConnect :: ! HowToConnect
79+ -- ^ A LocalPipe or RemoteSocket
80+ , initForwarderMode :: ! ForwarderMode
81+ -- ^ Run as Initiator or Responder
82+ , initOnForwardInterruption :: ! (Maybe (SomeException -> IO () ))
83+ -- ^ Optional handler when forwarding connection is interrupted (may be temporary or permanent)
84+ -- default: no action
85+ , initOnQueueOverflow :: ! (Maybe ([TraceObject ] -> IO () ))
86+ -- ^ Optional handler when forwarding queue overflows (argument are objects dropped from queue)
87+ -- default: print one-liner to stderr, indicating object count and timestamps of first and last object
88+ }
89+
90+
6491initForwarding :: forall m . (MonadIO m )
6592 => IOManager
6693 -> TraceOptionForwarder
67- -> NetworkMagic
68- -> Maybe EKG. Store
69- -> Maybe (HowToConnect , ForwarderMode )
94+ -> InitForwardingConfig
7095 -> m (ForwardSink TraceObject , DataPointStore )
71- initForwarding iomgr config magic ekgStore tracerSocketMode = do
72- (a, b, kickoffForwarder) <- initForwardingDelayed iomgr config magic ekgStore tracerSocketMode
96+ initForwarding iomgr config forwarding = do
97+ (a, b, kickoffForwarder) <- initForwardingDelayed iomgr config forwarding
7398 liftIO kickoffForwarder
7499 pure (a, b)
75100
@@ -79,38 +104,34 @@ initForwardingDelayed :: forall m. ()
79104 => MonadIO m
80105 => IOManager
81106 -> TraceOptionForwarder
82- -> NetworkMagic
83- -> Maybe EKG. Store
84- -> Maybe (HowToConnect , ForwarderMode )
107+ -> InitForwardingConfig
85108 -> m (ForwardSink TraceObject , DataPointStore , IO () )
86- initForwardingDelayed iomgr config magic ekgStore tracerSocketMode = liftIO $ do
87- let ignoreOverflow, onOverflow :: [TraceObject ] -> IO ()
88- ignoreOverflow _ =
89- pure ()
90- onOverflow | isNothing tracerSocketMode = ignoreOverflow
91- | otherwise = handleOverflow
109+ initForwardingDelayed iomgr config forwarding = liftIO $ do
110+ let onOverflow :: [TraceObject ] -> IO ()
111+ onOverflow = case forwarding of
112+ InitForwardingNone -> const $ pure ()
113+ InitForwardingWith {initOnQueueOverflow = Just handler} -> handler
114+ InitForwardingWith {initOnQueueOverflow = Nothing } -> handleOverflow
92115 forwardSink <- initForwardSink tfConfig onOverflow
93116 dpStore <- initDataPointStore
94117 let
95118 kickoffForwarder = launchForwarders
96119 iomgr
97- magic
120+ forwarding
98121 ekgConfig
99122 tfConfig
100123 dpfConfig
101- ekgStore
102124 forwardSink
103125 dpStore
104- tracerSocketMode
105126 maxReconnectDelay
106127 pure (forwardSink, dpStore, kickoffForwarder)
107128 where
108129 endpoint :: EKGF. HowToConnect
109130 endpoint =
110- case tracerSocketMode of
111- Nothing -> EKGF. LocalPipe " "
112- Just ( LocalPipe str, _mode) -> EKGF. LocalPipe str
113- Just ( RemoteSocket host port, _mode) -> EKGF. RemoteSocket host port
131+ case forwarding of
132+ InitForwardingNone -> EKGF. LocalPipe " "
133+ InitForwardingWith {initHowToConnect = LocalPipe str} -> EKGF. LocalPipe str
134+ InitForwardingWith {initHowToConnect = RemoteSocket host port} -> EKGF. RemoteSocket host port
114135 queueSize = tofQueueSize config
115136 verbosity = tofVerbosity config
116137 maxReconnectDelay = tofMaxReconnectDelay config
@@ -157,39 +178,37 @@ handleOverflow (msg : msgs) =
157178
158179launchForwarders
159180 :: IOManager
160- -> NetworkMagic
181+ -> InitForwardingConfig
161182 -> EKGF. ForwarderConfiguration
162183 -> TF. ForwarderConfiguration TraceObject
163184 -> DPF. ForwarderConfiguration
164- -> Maybe EKG. Store
165185 -> ForwardSink TraceObject
166186 -> DataPointStore
167- -> Maybe (HowToConnect , ForwarderMode )
168187 -> Word
169188 -> IO ()
170- launchForwarders iomgr magic
189+ launchForwarders iomgr forwarding
171190 ekgConfig tfConfig dpfConfig
172- ekgStore sink dpStore tracerSocketMode maxReconnectDelay =
173- -- If 'tracerSocketMode' is not specified, it's impossible to establish
174- -- network connection with acceptor application (for example, 'cardano-tracer').
191+ sink dpStore maxReconnectDelay =
192+ -- If InitForwardingNone is specified, it's impossible to establish
193+ -- a connection with an acceptor application (for example, 'cardano-tracer').
175194 -- In this case, we should not launch forwarders.
176- case tracerSocketMode of
177- Nothing -> return ()
178- Just (socketPath, mode) ->
195+ case forwarding of
196+ InitForwardingNone -> return ()
197+ InitForwardingWith { .. } ->
179198 void . async $
180199 runInLoop
181200 (launchForwardersViaLocalSocket
182201 iomgr
183- magic
184- socketPath
185- mode
202+ initNetworkMagic
203+ initHowToConnect
204+ initForwarderMode
186205 ekgConfig
187206 tfConfig
188207 dpfConfig
189208 sink
190- ekgStore
209+ initEKGStore
191210 dpStore)
192- socketPath
211+ initHowToConnect
193212 1
194213 maxReconnectDelay
195214
0 commit comments