Skip to content

Commit 6d41954

Browse files
committed
Use dup channel to listen to events
1 parent 58ef107 commit 6d41954

File tree

2 files changed

+31
-19
lines changed

2 files changed

+31
-19
lines changed

hydra-node/src/Hydra/API/HTTPServer.hs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module Hydra.API.HTTPServer where
55
import Hydra.Prelude
66

77
import Cardano.Ledger.Core (PParams)
8-
import Control.Concurrent.STM (TChan, readTChan)
8+
import Control.Concurrent.STM (TChan, dupTChan, readTChan)
99
import Data.Aeson (KeyValue ((.=)), object, withObject, (.:))
1010
import Data.Aeson qualified as Aeson
1111
import Data.ByteString.Lazy qualified as LBS
@@ -387,14 +387,17 @@ handleSubmitHydraTx putClientInput apiTransactionTimeout responseChannel body =
387387
Left err ->
388388
pure $ responseLBS status400 jsonContent (Aeson.encode $ Aeson.String $ pack err)
389389
Right SubmitHydraTxRequest{submitHydraTx} -> do
390+
-- Duplicate the channel to avoid consuming messages from other consumers.
391+
dupChannel <- atomically $ dupTChan responseChannel
392+
390393
-- Submit the transaction to the head
391394
putClientInput (NewTx submitHydraTx)
392395

393396
let txid = txId submitHydraTx
394397
result <-
395398
timeout
396399
(realToFrac (apiTransactionTimeoutNominalDiffTime apiTransactionTimeout))
397-
(waitForTransactionResult txid)
400+
(waitForTransactionResult dupChannel txid)
398401

399402
case result of
400403
Just (SubmitTxConfirmed snapshotNumber) ->
@@ -417,11 +420,11 @@ handleSubmitHydraTx putClientInput apiTransactionTimeout responseChannel body =
417420
)
418421
where
419422
-- Wait for transaction result by listening to events
420-
waitForTransactionResult :: TxIdType tx -> IO SubmitHydraTxResponse
421-
waitForTransactionResult txid = go
423+
waitForTransactionResult :: TChan (Either (TimedServerOutput tx) (ClientMessage tx)) -> TxIdType tx -> IO SubmitHydraTxResponse
424+
waitForTransactionResult dupChannel txid = go
422425
where
423426
go = do
424-
event <- atomically $ readTChan responseChannel
427+
event <- atomically $ readTChan dupChannel
425428
case event of
426429
Left (TimedServerOutput{output}) -> case output of
427430
TxValid{transactionId}

hydra-node/test/Hydra/API/HTTPServerSpec.hs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,8 @@ apiServerSpec = do
210210
getPendingDeposits = pure []
211211
putClientInput = const (pure ())
212212
getHeadState = pure inIdleState
213-
responseChannelSimple <- runIO newTChanIO
214-
responseChannel <- runIO newTChanIO
215213
describe "GET /protocol-parameters" $ do
214+
responseChannel <- runIO newTChanIO
216215
with
217216
( return $
218217
httpApp @SimpleTx
@@ -244,6 +243,7 @@ apiServerSpec = do
244243
}
245244

246245
describe "GET /head" $ do
246+
responseChannel <- runIO newTChanIO
247247
prop "responds correctly" $ \headState -> do
248248
withApplication
249249
( httpApp @SimpleTx
@@ -261,6 +261,7 @@ apiServerSpec = do
261261
$ do
262262
get "/head"
263263
`shouldRespondWith` 200{matchBody = matchJSON headState}
264+
responseChannelSimpleTx <- runIO newTChanIO
264265
prop "ok response matches schema" $ \headState -> do
265266
let isIdle = case headState of
266267
Idle{} -> True
@@ -292,7 +293,7 @@ apiServerSpec = do
292293
getPendingDeposits
293294
putClientInput
294295
300
295-
responseChannelSimple
296+
responseChannelSimpleTx
296297
)
297298
$ do
298299
get "/head"
@@ -303,6 +304,7 @@ apiServerSpec = do
303304
(key "channels" . key "/head" . key "subscribe" . key "message")
304305
}
305306
describe "GET /snapshot/last-seen" $ do
307+
responseChannel <- runIO newTChanIO
306308
prop "responds correctly" $ \headState -> do
307309
let seenSnapshot :: SeenSnapshot SimpleTx = getSeenSnapshot headState
308310
withApplication
@@ -322,6 +324,7 @@ apiServerSpec = do
322324
get "/snapshot/last-seen"
323325
`shouldRespondWith` 200{matchBody = matchJSON seenSnapshot}
324326
describe "GET /snapshot" $ do
327+
responseChannel <- runIO newTChanIO
325328
prop "responds correctly" $ \headState -> do
326329
let confirmedSnapshot :: Maybe (ConfirmedSnapshot SimpleTx) = getConfirmedSnapshot headState
327330
withApplication
@@ -342,7 +345,8 @@ apiServerSpec = do
342345
`shouldRespondWith` case confirmedSnapshot of
343346
Nothing -> 404
344347
Just confirmedSn -> 200{matchBody = matchJSON confirmedSn}
345-
prop "ok response matches schema" $ \(closedState :: ClosedState tx) ->
348+
responseChannelSimpleTx <- runIO newTChanIO
349+
prop "ok response matches schema" $ \(closedState :: ClosedState tx) -> do
346350
withMaxSuccess 4
347351
. withJsonSpecifications
348352
$ \schemaDir -> do
@@ -357,7 +361,7 @@ apiServerSpec = do
357361
getPendingDeposits
358362
putClientInput
359363
300
360-
responseChannelSimple
364+
responseChannelSimpleTx
361365
)
362366
$ do
363367
get "/snapshot"
@@ -369,6 +373,7 @@ apiServerSpec = do
369373
}
370374

371375
describe "POST /snapshot" $ do
376+
responseChannel <- runIO newTChanIO
372377
prop "responds on valid requests" $ \(request :: SideLoadSnapshotRequest Tx, headState) -> do
373378
withMaxSuccess 10
374379
. withApplication
@@ -382,13 +387,14 @@ apiServerSpec = do
382387
getPendingDeposits
383388
putClientInput
384389
300
385-
responseChannelSimple
390+
responseChannel
386391
)
387392
$ do
388393
post "/snapshot" (Aeson.encode request)
389394
`shouldRespondWith` 200
390395

391396
describe "GET /snapshot/utxo" $ do
397+
responseChannel <- runIO newTChanIO
392398
prop "responds correctly" $ \headState -> do
393399
let utxo :: Maybe (UTxOType SimpleTx) = getSnapshotUtxo headState
394400
withApplication
@@ -409,6 +415,7 @@ apiServerSpec = do
409415
`shouldRespondWith` case utxo of
410416
Nothing -> 404
411417
Just u -> 200{matchBody = matchJSON u}
418+
responseChannelSimpleTx <- runIO newTChanIO
412419
prop "ok response matches schema" $ \headState -> do
413420
let mUTxO = getSnapshotUtxo headState
414421
utxo :: UTxOType Tx = fromMaybe mempty mUTxO
@@ -428,7 +435,7 @@ apiServerSpec = do
428435
getPendingDeposits
429436
putClientInput
430437
300
431-
responseChannelSimple
438+
responseChannelSimpleTx
432439
)
433440
$ do
434441
get "/snapshot/utxo"
@@ -466,7 +473,7 @@ apiServerSpec = do
466473
getPendingDeposits
467474
putClientInput
468475
300
469-
responseChannelSimple
476+
responseChannelSimpleTx
470477
)
471478
$ do
472479
get "/snapshot/utxo"
@@ -487,6 +494,7 @@ apiServerSpec = do
487494
}
488495
let initialHeadState = Initial (generateWith arbitrary 42)
489496
let openHeadState = Open (generateWith arbitrary 42)
497+
responseChannel <- runIO newTChanIO
490498
prop "responds on valid requests" $ \(request :: DraftCommitTxRequest Tx) ->
491499
withApplication
492500
( httpApp
@@ -499,7 +507,7 @@ apiServerSpec = do
499507
getPendingDeposits
500508
putClientInput
501509
300
502-
responseChannelSimple
510+
responseChannel
503511
)
504512
$ do
505513
post "/commit" (Aeson.encode request)
@@ -542,7 +550,7 @@ apiServerSpec = do
542550
getPendingDeposits
543551
putClientInput
544552
300
545-
responseChannelSimple
553+
responseChannel
546554
)
547555
$ do
548556
post "/commit" (Aeson.encode (request :: DraftCommitTxRequest Tx))
@@ -560,6 +568,7 @@ apiServerSpec = do
560568
now <- runIO getCurrentTime
561569

562570
prop "returns 202 Accepted on timeout" $ do
571+
responseChannel <- newTChanIO
563572
withApplication
564573
( httpApp @SimpleTx
565574
nullTracer
@@ -577,6 +586,7 @@ apiServerSpec = do
577586
post "/transaction" (mkReq testTx) `shouldRespondWith` 202
578587

579588
prop "returns 200 OK on confirmed snapshot" $ do
589+
responseChannel <- newTChanIO
580590
let snapshot =
581591
Snapshot
582592
{ headId = testHeadId
@@ -593,7 +603,6 @@ apiServerSpec = do
593603
, seq = 0
594604
, time = now
595605
}
596-
_ <- atomically $ writeTChan responseChannel (Left event)
597606
withApplication
598607
( httpApp @SimpleTx
599608
nullTracer
@@ -603,14 +612,15 @@ apiServerSpec = do
603612
(pure inIdleState)
604613
(pure CannotCommit)
605614
(pure [])
606-
(const $ pure ())
615+
(const $ atomically $ writeTChan responseChannel (Left event))
607616
10
608617
responseChannel
609618
)
610619
$ do
611620
post "/transaction" (mkReq testTx) `shouldRespondWith` 200
612621

613622
prop "returns 400 Bad Request on invalid tx" $ do
623+
responseChannel <- newTChanIO
614624
let validationError = ValidationError "some error"
615625
event =
616626
TimedServerOutput
@@ -624,7 +634,6 @@ apiServerSpec = do
624634
, seq = 0
625635
, time = now
626636
}
627-
_ <- atomically $ writeTChan responseChannel (Left event)
628637
withApplication
629638
( httpApp @SimpleTx
630639
nullTracer
@@ -634,7 +643,7 @@ apiServerSpec = do
634643
(pure inIdleState)
635644
(pure CannotCommit)
636645
(pure [])
637-
(const $ pure ())
646+
(const $ atomically $ writeTChan responseChannel (Left event))
638647
10
639648
responseChannel
640649
)

0 commit comments

Comments
 (0)