Skip to content

Commit f596da3

Browse files
facundominguezneilmayhew
authored andcommitted
Introduce a collection of chainsync handles that synchronizes a map and a queue
1 parent a020b7b commit f596da3

File tree

9 files changed

+146
-71
lines changed

9 files changed

+146
-71
lines changed

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
6262
import Ouroboros.Consensus.Mempool
6363
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
6464
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
65-
(ChainSyncClientHandle (..), ChainSyncState (..),
66-
viewChainSyncState)
65+
(ChainSyncClientHandle (..),
66+
ChainSyncClientHandleCollection (..), ChainSyncState (..),
67+
newChainSyncClientHandleCollection, viewChainSyncState)
6768
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck
6869
(HistoricityCheck)
6970
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
@@ -143,7 +144,7 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
143144
, getGsmState :: STM m GSM.GsmState
144145

145146
-- | The kill handle and exposed state for each ChainSync client.
146-
, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
147+
, getChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
147148

148149
-- | Read the current peer sharing registry, used for interacting with
149150
-- the PeerSharing protocol
@@ -252,7 +253,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
252253
<&> \wd (_headers, lst) ->
253254
GSM.getDurationUntilTooOld wd (getTipSlot lst)
254255
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
255-
, GSM.getChainSyncStates = fmap cschState <$> readTVar varChainSyncHandles
256+
, GSM.getChainSyncStates = fmap cschState <$> cschcMap varChainSyncHandles
256257
, GSM.getCurrentSelection = do
257258
headers <- ChainDB.getCurrentChain chainDB
258259
extLedgerState <- ChainDB.getCurrentLedger chainDB
@@ -264,7 +265,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
264265
, GSM.writeGsmState = \gsmState ->
265266
atomicallyWithMonotonicTime $ \time -> do
266267
writeTVar varGsmState gsmState
267-
handles <- readTVar varChainSyncHandles
268+
handles <- cschcMap varChainSyncHandles
268269
traverse_ (($ time) . ($ gsmState) . cschOnGsmStateChanged) handles
269270
, GSM.isHaaSatisfied = do
270271
readTVar varOutboundConnectionsState <&> \case
@@ -299,7 +300,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
299300
chainDB
300301
(readTVar varGsmState)
301302
-- TODO GDD should only consider (big) ledger peers
302-
(readTVar varChainSyncHandles)
303+
(cschcMap varChainSyncHandles)
303304
varLoEFragment
304305

305306
void $ forkLinkedThread registry "NodeKernel.blockForging" $
@@ -356,7 +357,7 @@ data InternalState m addrNTN addrNTC blk = IS {
356357
, chainDB :: ChainDB m blk
357358
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
358359
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
359-
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
360+
, varChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
360361
, varGsmState :: StrictTVar m GSM.GsmState
361362
, mempool :: Mempool m blk
362363
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
@@ -385,7 +386,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
385386
gsmMarkerFileView
386387
newTVarIO gsmState
387388

388-
varChainSyncHandles <- newTVarIO mempty
389+
varChainSyncHandles <- atomically newChainSyncClientHandleCollection
389390
mempool <- openMempool registry
390391
(chainDBLedgerInterface chainDB)
391392
(configLedger cfg)
@@ -395,7 +396,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
395396
fetchClientRegistry <- newFetchClientRegistry
396397

397398
let getCandidates :: STM m (Map (ConnectionId addrNTN) (AnchoredFragment (Header blk)))
398-
getCandidates = viewChainSyncState varChainSyncHandles csCandidate
399+
getCandidates = viewChainSyncState (cschcMap varChainSyncHandles) csCandidate
399400

400401
slotForgeTimeOracle <- BlockFetchClientInterface.initSlotForgeTimeOracle cfg chainDB
401402
let readFetchMode = BlockFetchClientInterface.readFetchModeDefault

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/CSJInvariants.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import Data.Typeable (Typeable)
1919
import Ouroboros.Consensus.Block (Point, StandardHash, castPoint)
2020
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State as CSState
2121
import Ouroboros.Consensus.Util.IOLike (Exception, MonadSTM (STM),
22-
MonadThrow (throwIO), StrictTVar, readTVar)
22+
MonadThrow (throwIO), readTVar)
2323
import Ouroboros.Consensus.Util.STM (Watcher (..))
2424

2525
--------------------------------------------------------------------------------
@@ -109,10 +109,10 @@ readAndView ::
109109
forall m peer blk.
110110
( MonadSTM m
111111
) =>
112-
StrictTVar m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
112+
STM m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
113113
STM m (View peer blk)
114-
readAndView handles =
115-
traverse (fmap idealiseState . readTVar . CSState.cschJumping) =<< readTVar handles
114+
readAndView readHandles =
115+
traverse (fmap idealiseState . readTVar . CSState.cschJumping) =<< readHandles
116116
where
117117
-- Idealise the state of a ChainSync peer with respect to ChainSync jumping.
118118
-- In particular, we get rid of non-comparable information such as the TVars
@@ -170,7 +170,7 @@ watcher ::
170170
Typeable blk,
171171
StandardHash blk
172172
) =>
173-
StrictTVar m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
173+
STM m (Map peer (CSState.ChainSyncClientHandle m blk)) ->
174174
Watcher m (View peer blk) (View peer blk)
175175
watcher handles =
176176
Watcher

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ChainSync.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ module Test.Consensus.PeerSimulator.ChainSync (
1313
import Control.Exception (SomeException)
1414
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
1515
import Control.Tracer (Tracer (Tracer), nullTracer, traceWith)
16-
import Data.Map.Strict (Map)
1716
import Data.Proxy (Proxy (..))
1817
import Network.TypedProtocol.Codec (AnyMessage)
1918
import Ouroboros.Consensus.Block (Header, Point)
@@ -23,16 +22,17 @@ import Ouroboros.Consensus.Config (DiffusionPipeliningSupport (..),
2322
import Ouroboros.Consensus.Ledger.SupportsProtocol
2423
(LedgerSupportsProtocol)
2524
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
26-
(CSJConfig (..), ChainDbView, ChainSyncClientHandle,
27-
ChainSyncLoPBucketConfig, ChainSyncStateView (..),
28-
Consensus, bracketChainSyncClient, chainSyncClient)
25+
(CSJConfig (..), ChainDbView,
26+
ChainSyncClientHandleCollection, ChainSyncLoPBucketConfig,
27+
ChainSyncStateView (..), Consensus, bracketChainSyncClient,
28+
chainSyncClient)
2929
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
3030
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck as HistoricityCheck
3131
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
3232
import Ouroboros.Consensus.Node.GsmState (GsmState (Syncing))
3333
import Ouroboros.Consensus.Util (ShowProxy)
3434
import Ouroboros.Consensus.Util.IOLike (Exception (fromException),
35-
IOLike, MonadCatch (try), StrictTVar)
35+
IOLike, MonadCatch (try))
3636
import Ouroboros.Network.Block (Tip)
3737
import Ouroboros.Network.Channel (Channel)
3838
import Ouroboros.Network.ControlMessage (ControlMessage (..))
@@ -134,7 +134,7 @@ runChainSyncClient ::
134134
-- ^ Configuration for ChainSync Jumping
135135
StateViewTracers blk m ->
136136
-- ^ Tracers used to record information for the future 'StateView'.
137-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
137+
ChainSyncClientHandleCollection PeerId m blk ->
138138
-- ^ A TVar containing a map of states for each peer. This
139139
-- function will (via 'bracketChainSyncClient') register and de-register a
140140
-- TVar for the state of the peer.

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/NodeLifecycle.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import Data.Set (Set)
2020
import qualified Data.Set as Set
2121
import Ouroboros.Consensus.Block
2222
import Ouroboros.Consensus.Config (TopLevelConfig (..))
23+
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
24+
(ChainSyncClientHandleCollection (..))
2325
import Ouroboros.Consensus.Storage.ChainDB.API
2426
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
2527
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl as ChainDB
@@ -204,7 +206,7 @@ lifecycleStop resources LiveNode {lnStateViewTracers, lnCopyToImmDb, lnPeers} =
204206
releaseAll lrRegistry
205207
-- Reset the resources in TVars that were allocated by the simulator
206208
atomically $ do
207-
modifyTVar psrHandles (const mempty)
209+
cschcRemoveAllHandles psrHandles
208210
case lrLoEVar of
209211
LoEEnabled var -> modifyTVar var (const (AF.Empty AF.AnchorGenesis))
210212
LoEDisabled -> pure ()

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Resources.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import Data.Traversable (for)
2525
import Ouroboros.Consensus.Block (WithOrigin (Origin))
2626
import Ouroboros.Consensus.Block.Abstract (Header, Point (..))
2727
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
28-
(ChainSyncClientHandle)
28+
(ChainSyncClientHandleCollection,
29+
newChainSyncClientHandleCollection)
2930
import Ouroboros.Consensus.Util.IOLike (IOLike, MonadSTM (STM),
3031
StrictTVar, readTVar, uncheckedNewTVarM, writeTVar)
3132
import qualified Ouroboros.Network.AnchoredFragment as AF
@@ -115,7 +116,7 @@ data PeerSimulatorResources m blk =
115116

116117
-- | Handles to interact with the ChainSync client of each peer.
117118
-- See 'ChainSyncClientHandle' for more details.
118-
psrHandles :: StrictTVar m (Map PeerId (ChainSyncClientHandle m TestBlock))
119+
psrHandles :: ChainSyncClientHandleCollection PeerId m TestBlock
119120
}
120121

121122
-- | Create 'ChainSyncServerHandlers' for our default implementation using 'NodeState'.
@@ -233,5 +234,5 @@ makePeerSimulatorResources tracer blockTree peers = do
233234
resources <- for peers $ \ peerId -> do
234235
peerResources <- makePeerResources tracer blockTree peerId
235236
pure (peerId, peerResources)
236-
psrHandles <- uncheckedNewTVarM mempty
237+
psrHandles <- atomically newChainSyncClientHandleCollection
237238
pure PeerSimulatorResources {psrPeers = Map.fromList $ toList resources, psrHandles}

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
2727
(LedgerSupportsProtocol)
2828
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
2929
(CSJConfig (..), CSJEnabledConfig (..), ChainDbView,
30-
ChainSyncClientHandle, ChainSyncLoPBucketConfig (..),
30+
ChainSyncClientHandle,
31+
ChainSyncClientHandleCollection (..),
32+
ChainSyncLoPBucketConfig (..),
3133
ChainSyncLoPBucketEnabledConfig (..), viewChainSyncState)
3234
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
3335
import qualified Ouroboros.Consensus.Node.GsmState as GSM
@@ -147,7 +149,7 @@ startChainSyncConnectionThread ::
147149
ChainSyncLoPBucketConfig ->
148150
CSJConfig ->
149151
StateViewTracers blk m ->
150-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
152+
ChainSyncClientHandleCollection PeerId m blk ->
151153
m (Thread m (), Thread m ())
152154
startChainSyncConnectionThread
153155
registry
@@ -230,7 +232,7 @@ smartDelay _ node duration = do
230232
dispatchTick :: forall m blk.
231233
IOLike m =>
232234
Tracer m (TraceSchedulerEvent blk) ->
233-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
235+
STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
234236
Map PeerId (PeerResources m blk) ->
235237
NodeLifecycle blk m ->
236238
LiveNode blk m ->
@@ -250,7 +252,7 @@ dispatchTick tracer varHandles peers lifecycle node (number, (duration, Peer pid
250252
traceNewTick = do
251253
currentChain <- atomically $ ChainDB.getCurrentChain (lnChainDb node)
252254
(csState, jumpingStates) <- atomically $ do
253-
m <- readTVar varHandles
255+
m <- varHandles
254256
csState <- traverse (readTVar . CSClient.cschState) (m Map.!? pid)
255257
jumpingStates <- forM (Map.toList m) $ \(peer, h) -> do
256258
st <- readTVar (CSClient.cschJumping h)
@@ -272,7 +274,7 @@ dispatchTick tracer varHandles peers lifecycle node (number, (duration, Peer pid
272274
runScheduler ::
273275
IOLike m =>
274276
Tracer m (TraceSchedulerEvent blk) ->
275-
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
277+
STM m (Map PeerId (ChainSyncClientHandle m blk)) ->
276278
PointSchedule blk ->
277279
Map PeerId (PeerResources m blk) ->
278280
NodeLifecycle blk m ->
@@ -314,7 +316,7 @@ mkStateTracer ::
314316
m (Tracer m ())
315317
mkStateTracer schedulerConfig GenesisTest {gtBlockTree} PeerSimulatorResources {psrHandles, psrPeers} chainDb
316318
| scTraceState schedulerConfig
317-
, let getCandidates = viewChainSyncState psrHandles CSClient.csCandidate
319+
, let getCandidates = viewChainSyncState (cschcMap psrHandles) CSClient.csCandidate
318320
getCurrentChain = ChainDB.getCurrentChain chainDb
319321
getPoints = traverse readTVar (srCurrentState . prShared <$> psrPeers)
320322
= peerSimStateDiagramSTMTracerDebug gtBlockTree getCurrentChain getCandidates getPoints
@@ -335,7 +337,7 @@ startNode ::
335337
startNode schedulerConfig genesisTest interval = do
336338
let
337339
handles = psrHandles lrPeerSim
338-
getCandidates = viewChainSyncState handles CSClient.csCandidate
340+
getCandidates = viewChainSyncState (cschcMap handles) CSClient.csCandidate
339341
fetchClientRegistry <- newFetchClientRegistry
340342
let chainDbView = CSClient.defaultChainDbView lnChainDb
341343
activePeers = Map.restrictKeys (psrPeers lrPeerSim) (lirActive liveResult)
@@ -384,10 +386,11 @@ startNode schedulerConfig genesisTest interval = do
384386
(mkGDDTracerTestBlock lrTracer)
385387
lnChainDb
386388
(pure GSM.Syncing) -- TODO actually run GSM
387-
(readTVar handles)
389+
(cschcMap handles)
388390
var
389391

390-
void $ forkLinkedWatcher lrRegistry "CSJ invariants watcher" $ CSJInvariants.watcher handles
392+
void $ forkLinkedWatcher lrRegistry "CSJ invariants watcher" $
393+
CSJInvariants.watcher (cschcMap handles)
391394
where
392395
LiveResources {lrRegistry, lrTracer, lrConfig, lrPeerSim, lrLoEVar} = resources
393396

@@ -483,7 +486,7 @@ runPointSchedule schedulerConfig genesisTest tracer0 =
483486
lifecycle <- nodeLifecycle schedulerConfig genesisTest tracer registry peerSim
484487
(chainDb, stateViewTracers) <- runScheduler
485488
(Tracer $ traceWith tracer . TraceSchedulerEvent)
486-
(psrHandles peerSim)
489+
(cschcMap (psrHandles peerSim))
487490
gtSchedule
488491
(psrPeers peerSim)
489492
lifecycle

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,12 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client (
6363
, TraceChainSyncClientEvent (..)
6464
-- * State shared with other components
6565
, ChainSyncClientHandle (..)
66+
, ChainSyncClientHandleCollection (..)
6667
, ChainSyncState (..)
6768
, ChainSyncStateView (..)
6869
, Jumping.noJumping
6970
, chainSyncStateFor
71+
, newChainSyncClientHandleCollection
7072
, noIdling
7173
, noLoPBucket
7274
, viewChainSyncState
@@ -231,11 +233,11 @@ newtype Our a = Our { unOur :: a }
231233
-- data from 'ChainSyncState'.
232234
viewChainSyncState ::
233235
IOLike m =>
234-
StrictTVar m (Map peer (ChainSyncClientHandle m blk)) ->
236+
STM m (Map peer (ChainSyncClientHandle m blk)) ->
235237
(ChainSyncState blk -> a) ->
236238
STM m (Map peer a)
237-
viewChainSyncState varHandles f =
238-
Map.map f <$> (traverse (readTVar . cschState) =<< readTVar varHandles)
239+
viewChainSyncState readHandles f =
240+
Map.map f <$> (traverse (readTVar . cschState) =<< readHandles)
239241

240242
-- | Convenience function for reading the 'ChainSyncState' for a single peer
241243
-- from a nested set of TVars.
@@ -329,7 +331,7 @@ bracketChainSyncClient ::
329331
)
330332
=> Tracer m (TraceChainSyncClientEvent blk)
331333
-> ChainDbView m blk
332-
-> StrictTVar m (Map peer (ChainSyncClientHandle m blk))
334+
-> ChainSyncClientHandleCollection peer m blk
333335
-- ^ The kill handle and states for each peer, we need the whole map because we
334336
-- (de)register nodes (@peer@).
335337
-> STM m GsmState
@@ -404,8 +406,8 @@ bracketChainSyncClient
404406
insertHandle = atomicallyWithMonotonicTime $ \time -> do
405407
initialGsmState <- getGsmState
406408
updateLopBucketConfig lopBucket initialGsmState time
407-
modifyTVar varHandles $ Map.insert peer handle
408-
deleteHandle = atomically $ modifyTVar varHandles $ Map.delete peer
409+
cschcAddHandle varHandles peer handle
410+
deleteHandle = atomically $ cschcRemoveHandle varHandles peer
409411
bracket_ insertHandle deleteHandle $ f Jumping.noJumping
410412

411413
withCSJCallbacks lopBucket csHandleState (CSJEnabled csjEnabledConfig) f =

0 commit comments

Comments
 (0)