Skip to content

Commit d80e5a5

Browse files
authored
Label threads, queues and vars (#2172)
#### Motivation Labels make it easier to track the origin and lifecycle of threads, queues, and transactional variables, especially when used in simulations (e.g. `IOSim`) or debugging tools. #### 🔄 Changes This PR introduces **labelled variants** of common concurrency primitives and combinators to improve observability and debugging. Replaced existing constructors/combinators with labelled versions: * **STM / TVars / Queues** * `newTVar` → `newLabelledTVar` * `newTVarIO` → `newLabelledTVarIO` * `newEmptyTMVar` → `newLabelledEmptyTMVar` * `newEmptyTMVarIO` → `newLabelledEmptyTMVarIO` * `newTQueue` → `newLabelledTQueue` * `newTQueueIO` → `newLabelledTQueueIO` * `newTBQueue` → `newLabelledTBQueue` * `newTBQueueIO` → `newLabelledTBQueueIO` * **Async / Thread combinators** (based on `Control.Monad.Class.MonadFork.labelThisThread`) * `race` → `raceLabelled` * `race_` → `raceLabelled_` * `withAsync` → `withAsyncLabelled` * `async` → `asyncLabelled` * `concurrently` → `concurrentlyLabelled` * `concurrently_` → `concurrentlyLabelled_` #### 🔍 Usage not found * `newTMVar`, `newTMVarIO` * `forkIO` and its variants #### ⚠️ Unresolved * `newEmptyMVar` * `newMVar` > **Reason:** Labelled versions exist in > > * [`MonadMVar` docs](https://hackage.haskell.org/package/io-classes-1.8.0.1/docs/Control-Concurrent-Class-MonadMVar.html) > * [`io-classes` changelog](https://github.com/input-output-hk/io-sim/blob/aadd3602b9b75640b042a02e0e62d10dd399b610/io-classes/CHANGELOG.md?plain=1#L23) > > However, their `IO` implementations are no-ops (labels are only effective under `IOSim`). > We need to decide whether to wrap these in our codebase or leave them unlabelled for now. --- <!-- Consider each and tick it off one way or the other --> * [X] CHANGELOG updated or not needed * [X] Documentation updated or not needed * [X] Haddocks updated or not needed * [X] No new TODOs introduced or explained herafter
2 parents 635e567 + d9e6cbd commit d80e5a5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+355
-278
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,13 @@ changes.
6565

6666
- **BREAKING** Rename `--script-info` option to `--hydra-script-catalogue` in the `hydra-node` CLI.
6767

68-
Fix rotation log id consistency after restart by changing the rotation check to trigger only
68+
- Fix rotation log id consistency after restart by changing the rotation check to trigger only
6969
when the number of persisted `StateChanged` events exceeds the configured `--persistence-rotate-after` threshold.
7070
* This also prevents immediate rotation on startup when the threshold is set to 1.
7171
* `Checkpoint` event ids now match the suffix of their preceding rotated log file and the last `StateChanged` event id within it,
7272
preserving sequential order and making it easier to identify which rotated log file was used to compute it.
7373

74+
- Label threads, queues and vars.
7475

7576
## [0.22.2] - 2025-06-30
7677

hydra-chain-observer/src/Hydra/Blockfrost/ChainObserver.hs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import Blockfrost.Client (
1111
import Blockfrost.Client qualified as Blockfrost
1212
import Control.Concurrent.Class.MonadSTM (
1313
MonadSTM (readTVarIO),
14-
newTVarIO,
1514
writeTVar,
1615
)
1716
import Control.Retry (RetryPolicyM, RetryStatus, constantDelay, retrying)
@@ -96,7 +95,7 @@ blockfrostClient tracer projectPath blockConfirmations = do
9695

9796
let blockHash = fromChainPoint chainPoint genesisBlockHash
9897

99-
stateTVar <- newTVarIO (blockHash, mempty)
98+
stateTVar <- newLabelledTVarIO "blockfrost-client-state" (blockHash, mempty)
10099
void $
101100
retrying (retryPolicy blockTime) shouldRetry $ \_ -> do
102101
loop tracer prj networkId blockTime observerHandler blockConfirmations stateTVar

hydra-cluster/bench/Bench/EndToEnd.hs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import Control.Concurrent.Class.MonadSTM (
1212
check,
1313
lengthTBQueue,
1414
modifyTVar,
15-
newTBQueueIO,
16-
newTVarIO,
1715
tryReadTBQueue,
1816
writeTBQueue,
1917
writeTVar,
@@ -83,7 +81,7 @@ bench startingNodeId timeoutSeconds workDir dataset = do
8381
putTextLn "Starting benchmark"
8482
let cardanoKeys = hydraNodeKeys dataset <&> \sk -> (getVerificationKey sk, sk)
8583
let hydraKeys = generateSigningKey . show <$> [1 .. toInteger (length cardanoKeys)]
86-
statsTvar <- newTVarIO mempty
84+
statsTvar <- newLabelledTVarIO "bench-stats" mempty
8785
scenarioData <- withOSStats workDir statsTvar $
8886
withCardanoNodeDevnet (contramap FromCardanoNode tracer) workDir $ \_ backend -> do
8987
let nodeSocket' = case Backend.getOptions backend of
@@ -261,13 +259,14 @@ withOSStats workDir tvar action =
261259
Nothing -> action
262260
Just _ ->
263261
withCreateProcess process{std_out = CreatePipe} $ \_stdin out _stderr _processHandle ->
264-
race
265-
( do
262+
raceLabelled
263+
( "os-stats-collect"
264+
, do
266265
-- Write the header
267266
atomically $ writeTVar tvar [" | Time | Used | Free | ", "|------------------------------------|------|------|"]
268267
collectStats tvar out
269268
)
270-
action
269+
("os-stats-action", action)
271270
>>= \case
272271
Left _ -> failure "dool process failed unexpectedly"
273272
Right a -> pure a
@@ -384,12 +383,15 @@ processTransactions clients clientDatasets = do
384383

385384
clientProcessDataset (ClientDataset{txSequence}, client) clientId = do
386385
let numberOfTxs = length txSequence
387-
submissionQ <- newTBQueueIO (fromIntegral numberOfTxs)
386+
submissionQ <- newLabelledTBQueueIO "submission" (fromIntegral numberOfTxs)
388387
registry <- newRegistry
389388
atomically $ forM_ txSequence $ writeTBQueue submissionQ
390-
( submitTxs client registry submissionQ
391-
`concurrently_` waitForAllConfirmations client registry (Set.fromList $ map txId txSequence)
392-
`concurrently_` progressReport (hydraNodeId client) clientId numberOfTxs submissionQ
389+
concurrentlyLabelled_
390+
("submit-txs", submitTxs client registry submissionQ)
391+
( "confirm-txs"
392+
, concurrentlyLabelled_
393+
("wait-for-all-confirmations", waitForAllConfirmations client registry (Set.fromList $ map txId txSequence))
394+
("progress-report", progressReport (hydraNodeId client) clientId numberOfTxs submissionQ)
393395
)
394396
`catch` \(HUnitFailure sourceLocation reason) ->
395397
putStrLn ("Something went wrong while waiting for all confirmations: " <> formatLocation sourceLocation <> ": " <> formatFailureReason reason)
@@ -443,8 +445,8 @@ data Registry tx = Registry
443445
newRegistry ::
444446
IO (Registry Tx)
445447
newRegistry = do
446-
processedTxs <- newTVarIO mempty
447-
latestSnapshot <- newTVarIO 0
448+
processedTxs <- newLabelledTVarIO "registry-processed-txs" mempty
449+
latestSnapshot <- newLabelledTVarIO "registry-latest-snapshot" 0
448450
pure $ Registry{processedTxs, latestSnapshot}
449451

450452
submitTxs ::

hydra-cluster/src/CardanoNode.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,9 @@ withCardanoNode tr stateDirectory args action = do
322322
withCreateProcess process{std_out = UseHandle out, std_err = CreatePipe} $
323323
\_stdin _stdout mError processHandle ->
324324
(`finally` cleanupSocketFile) $
325-
race (checkProcessHasNotDied "cardano-node" processHandle mError) waitForNode
325+
raceLabelled
326+
("check-cardano-node-process-not-died", checkProcessHasNotDied "cardano-node" processHandle mError)
327+
("wait-for-node", waitForNode)
326328
<&> either absurd id
327329
where
328330
CardanoNodeArgs{nodeSocket} = args

hydra-cluster/src/Hydra/Cluster/Scenarios.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,9 +1896,9 @@ threeNodesWithMirrorParty tracer workDir backend hydraScriptsTxId = do
18961896
-- N1 & N3 commit the same thing at the same time
18971897
-- XXX: one will fail but the head will still open
18981898
aliceUTxO <- seedFromFaucet backend aliceCardanoVk 1_000_000 (contramap FromFaucet tracer)
1899-
race_
1900-
(requestCommitTx n1 aliceUTxO >>= Backend.submitTransaction backend)
1901-
(requestCommitTx n3 aliceUTxO >>= Backend.submitTransaction backend)
1899+
raceLabelled_
1900+
("request-commit-tx-n1", requestCommitTx n1 aliceUTxO >>= Backend.submitTransaction backend)
1901+
("request-commit-tx-n3", requestCommitTx n3 aliceUTxO >>= Backend.submitTransaction backend)
19021902

19031903
-- N2 commits something
19041904
bobUTxO <- seedFromFaucet backend bobCardanoVk 1_000_000 (contramap FromFaucet tracer)

hydra-cluster/src/HydraNode.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import Hydra.Prelude hiding (STM, delete)
77

88
import CardanoNode (cliQueryProtocolParameters)
99
import Control.Concurrent.Async (forConcurrently_)
10-
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO)
10+
import Control.Concurrent.Class.MonadSTM (modifyTVar', readTVarIO)
1111
import Control.Exception (Handler (..), IOException, catches)
1212
import Control.Lens ((?~))
1313
import Control.Monad.Class.MonadAsync (forConcurrently)
@@ -105,7 +105,7 @@ waitNoMatch delay client match = do
105105
-- | Wait up to some time for an API server output to match the given predicate.
106106
waitMatch :: HasCallStack => NominalDiffTime -> HydraClient -> (Aeson.Value -> Maybe a) -> IO a
107107
waitMatch delay client@HydraClient{tracer, hydraNodeId} match = do
108-
seenMsgs <- newTVarIO []
108+
seenMsgs <- newLabelledTVarIO "wait-match-seen-msgs" []
109109
timeout (realToFrac delay) (go seenMsgs) >>= \case
110110
Just x -> pure x
111111
Nothing -> do
@@ -453,9 +453,9 @@ withPreparedHydraNode tracer workDir hydraNodeId runOptions action =
453453

454454
withProcessTerm cmd $ \p -> do
455455
-- NOTE: exit code thread gets cancelled if 'action' terminates first
456-
race
457-
(collectAndCheckExitCode p)
458-
(withConnectionToNode tracer hydraNodeId action)
456+
raceLabelled
457+
("collect-check-process-exit-code", collectAndCheckExitCode p)
458+
("with-connection-to-node", withConnectionToNode tracer hydraNodeId action)
459459
<&> either absurd id
460460
where
461461
collectAndCheckExitCode p = do

hydra-cluster/test/Test/BlockfrostChainSpec.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import Hydra.Prelude
66
import Test.Hydra.Prelude
77

88
import Cardano.Api.UTxO qualified as UTxO
9-
import Control.Concurrent.STM (newEmptyTMVarIO, takeTMVar)
9+
import Control.Concurrent.STM (takeTMVar)
1010
import Control.Concurrent.STM.TMVar (putTMVar)
1111
import Control.Exception (IOException)
1212
import Hydra.Chain (
@@ -164,7 +164,7 @@ withBlockfrostChainTest tracer config party action = do
164164
_ -> failure $ "unexpected chainBackendOptions: " <> show chainBackendOptions
165165
otherConfig -> failure $ "unexpected chainConfig: " <> show otherConfig
166166
ctx <- loadChainContext backend configuration party
167-
eventMVar <- newEmptyTMVarIO
167+
eventMVar <- newLabelledEmptyTMVarIO "blockfrost-chain-events"
168168

169169
let callback event = atomically $ putTMVar eventMVar event
170170

hydra-cluster/test/Test/ChainObserverSpec.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import Test.Hydra.Prelude
1111

1212
import Cardano.Api.UTxO qualified as UTxO
1313
import CardanoNode (NodeLog, withCardanoNodeDevnet)
14-
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO)
14+
import Control.Concurrent.Class.MonadSTM (modifyTVar', readTVarIO)
1515
import Control.Lens ((^?))
1616
import Data.Aeson as Aeson
1717
import Data.Aeson.Lens (key, _JSON, _String)
@@ -114,7 +114,7 @@ chainObserverSees observer txType headId =
114114

115115
awaitMatch :: HasCallStack => ChainObserverHandle -> DiffTime -> (Aeson.Value -> Maybe a) -> IO a
116116
awaitMatch chainObserverHandle delay f = do
117-
seenMsgs <- newTVarIO []
117+
seenMsgs <- newLabelledTVarIO "await-match-seen-msgs" []
118118
timeout delay (go seenMsgs) >>= \case
119119
Just x -> pure x
120120
Nothing -> do

hydra-cluster/test/Test/DirectChainSpec.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import CardanoClient (
1212
waitForUTxO,
1313
)
1414
import CardanoNode (NodeLog, withCardanoNodeDevnet)
15-
import Control.Concurrent.STM (newEmptyTMVarIO, takeTMVar)
15+
import Control.Concurrent.STM (takeTMVar)
1616
import Control.Concurrent.STM.TMVar (putTMVar)
1717
import Control.Lens ((<>~))
1818
import Data.List.Split (splitWhen)
@@ -548,7 +548,7 @@ withDirectChainTest tracer config party action = do
548548
_ -> failure $ "unexpected chainBackendOptions: " <> show chainBackendOptions
549549
otherConfig -> failure $ "unexpected chainConfig: " <> show otherConfig
550550
ctx <- loadChainContext backend configuration party
551-
eventMVar <- newEmptyTMVarIO
551+
eventMVar <- newLabelledEmptyTMVarIO "direct-chain-events"
552552

553553
let callback event = atomically $ putTMVar eventMVar event
554554

hydra-cluster/test/Test/EndToEndSpec.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -676,9 +676,9 @@ spec = around (showLogsOnFailure "EndToEndSpec") $ do
676676
withClusterTempDir $ \tmpDir -> do
677677
withBackend (contramap FromCardanoNode tracer) tmpDir $ \_ backend -> do
678678
hydraScriptsTxId <- publishHydraScriptsAs backend Faucet
679-
concurrently_
680-
(initAndClose tmpDir tracer 0 hydraScriptsTxId backend)
681-
(initAndClose tmpDir tracer 1 hydraScriptsTxId backend)
679+
concurrentlyLabelled_
680+
("init-and-close-0", initAndClose tmpDir tracer 0 hydraScriptsTxId backend)
681+
("init-and-close-1", initAndClose tmpDir tracer 1 hydraScriptsTxId backend)
682682

683683
it "alice inits a Head with incorrect keys preventing bob from observing InitTx" $ \tracer ->
684684
failAfter 60 $

0 commit comments

Comments
 (0)