Skip to content

Commit 312a617

Browse files
committed
Fix blocking of newPersistentQueue (#2147)
See copilot summary (let's give this a spin) --- * [x] CHANGELOG updated * [x] Documentation update not needed * [x] Haddocks update not needed * [ ] No new TODOs introduced - One FIXME which covers the still unknown part of why the queue was not cleared
1 parent a17249d commit 312a617

File tree

5 files changed

+93
-39
lines changed

5 files changed

+93
-39
lines changed

hydra-node/hydra-node.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ library
8787
Hydra.Network
8888
Hydra.Network.Authenticate
8989
Hydra.Network.Etcd
90+
Hydra.Network.EtcdBinary
9091
Hydra.Network.Message
9192
Hydra.NetworkVersions
9293
Hydra.Node
@@ -354,6 +355,7 @@ test-suite tests
354355
Hydra.OptionsSpec
355356
Hydra.PartySpec
356357
Hydra.PersistenceSpec
358+
Hydra.PersistentQueueSpec
357359
Hydra.UtilsSpec
358360
Paths_hydra_node
359361
Spec

hydra-node/src/Hydra/Network.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ data NetworkCallback msg m = NetworkCallback
5353
-- A `NetworkComponent` can have different inbound and outbound message types.
5454
type NetworkComponent m inbound outbound a = NetworkCallback inbound m -> (Network m outbound -> m a) -> m a
5555

56+
-- * Types used by concrete implementations
57+
5658
data WhichEtcd = EmbeddedEtcd | SystemEtcd
5759
deriving stock (Eq, Show, Generic)
5860
deriving anyclass (ToJSON, FromJSON)
@@ -61,8 +63,6 @@ instance Arbitrary WhichEtcd where
6163
shrink = genericShrink
6264
arbitrary = genericArbitrary
6365

64-
-- * Types used by concrete implementations
65-
6666
-- | Configuration for a `Node` network layer.
6767
data NetworkConfiguration = NetworkConfiguration
6868
{ persistenceDir :: FilePath

hydra-node/src/Hydra/Network/Etcd.hs

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
{-# LANGUAGE OverloadedLabels #-}
22
{-# LANGUAGE OverloadedStrings #-}
3-
{-# LANGUAGE TemplateHaskell #-}
4-
{-# OPTIONS_GHC -Wno-deferred-out-of-scope-variables #-}
53

64
-- | Implements a Hydra network component using [etcd](https://etcd.io/).
75
--
@@ -61,7 +59,6 @@ import Data.Aeson (decodeFileStrict', encodeFile)
6159
import Data.Aeson qualified as Aeson
6260
import Data.Aeson.Lens qualified as Aeson
6361
import Data.Aeson.Types (Value)
64-
import Data.Bits ((.|.))
6562
import Data.ByteString qualified as BS
6663
import Data.ByteString.Char8 qualified as BS8
6764
import Data.List ((\\))
@@ -77,9 +74,8 @@ import Hydra.Network (
7774
NetworkComponent,
7875
NetworkConfiguration (..),
7976
ProtocolVersion,
80-
WhichEtcd (..),
8177
)
82-
import Hydra.Node.EmbedTH (embedExecutable)
78+
import Hydra.Network.EtcdBinary (getEtcdBinary)
8379
import Network.GRPC.Client (
8480
Address (..),
8581
CallParams (..),
@@ -108,9 +104,8 @@ import Network.GRPC.Etcd (
108104
)
109105
import System.Directory (createDirectoryIfMissing, listDirectory, removeFile)
110106
import System.Environment.Blank (getEnvironment)
111-
import System.FilePath (takeDirectory, (</>))
107+
import System.FilePath ((</>))
112108
import System.IO.Error (isDoesNotExistError)
113-
import System.Posix (ownerExecuteMode, ownerReadMode, ownerWriteMode, setFileMode)
114109
import System.Process (interruptProcessGroupOf)
115110
import System.Process.Typed (
116111
Process,
@@ -254,21 +249,6 @@ withEtcdNetwork tracer protocolVersion config callback action = do
254249

255250
NetworkConfiguration{persistenceDir, listen, advertise, peers, whichEtcd} = config
256251

257-
-- | Return the path of the etcd binary. Will either install it first, or just
258-
-- assume there is one available on the system path.
259-
getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath
260-
getEtcdBinary _ SystemEtcd = pure "etcd"
261-
getEtcdBinary persistenceDir EmbeddedEtcd =
262-
let path = persistenceDir </> "bin" </> "etcd"
263-
in installEtcd path >> pure path
264-
265-
-- | Install the embedded 'etcd' binary to given file path.
266-
installEtcd :: FilePath -> IO ()
267-
installEtcd fp = do
268-
createDirectoryIfMissing True (takeDirectory fp)
269-
BS.writeFile fp $(embedExecutable "etcd")
270-
setFileMode fp (ownerReadMode .|. ownerWriteMode .|. ownerExecuteMode)
271-
272252
-- | Check and write version on etcd cluster. This will retry until we are on a
273253
-- majority cluster and succeed. If the version does not match a corresponding
274254
-- 'Connectivity' message is sent via 'NetworkCallback'.
@@ -563,29 +543,31 @@ newPersistentQueue ::
563543
Natural ->
564544
m (PersistentQueue m a)
565545
newPersistentQueue path capacity = do
566-
queue <- newTBQueueIO capacity
546+
paths <- liftIO $ do
547+
createDirectoryIfMissing True path
548+
sort . mapMaybe readMaybe <$> listDirectory path
549+
queue <- newTBQueueIO $ max (fromIntegral $ length paths) capacity
567550
highestId <-
568-
try (loadExisting queue) >>= \case
551+
try (loadExisting queue paths) >>= \case
569552
Left (_ :: IOException) -> do
553+
-- XXX: This swallows and not logs the error
570554
liftIO $ createDirectoryIfMissing True path
571555
pure 0
572556
Right highest -> pure highest
573557
nextIx <- newTVarIO $ highestId + 1
574558
pure PersistentQueue{queue, nextIx, directory = path}
575559
where
576-
loadExisting queue = do
577-
paths <- liftIO $ listDirectory path
578-
case sort $ mapMaybe readMaybe paths of
579-
[] -> pure 0
580-
idxs -> do
581-
forM_ idxs $ \(idx :: Natural) -> do
582-
bs <- readFileBS (path </> show idx)
583-
case decodeFull' bs of
584-
Left err ->
585-
fail $ "Failed to decode item: " <> show err
586-
Right item ->
587-
atomically $ writeTBQueue queue (idx, item)
588-
pure $ List.last idxs
560+
loadExisting queue = \case
561+
[] -> pure 0
562+
idxs -> do
563+
forM_ idxs $ \(idx :: Natural) -> do
564+
bs <- readFileBS (path </> show idx)
565+
case decodeFull' bs of
566+
Left err ->
567+
fail $ "Failed to decode item: " <> show err
568+
Right item ->
569+
atomically $ writeTBQueue queue (idx, item)
570+
pure $ List.last idxs
589571

590572
-- | Write a value to the queue, blocking if the queue is full.
591573
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
@@ -595,6 +577,7 @@ writePersistentQueue PersistentQueue{queue, nextIx, directory} item = do
595577
modifyTVar' nextIx (+ 1)
596578
pure next
597579
writeFileBS (directory </> show next) $ serialize' item
580+
-- XXX: We should trace when the queue is full
598581
atomically $ writeTBQueue queue (next, item)
599582

600583
-- | Get the next value from the queue without removing it, blocking if the
@@ -610,6 +593,8 @@ popPersistentQueue PersistentQueue{queue, directory} item = do
610593
popped <- atomically $ do
611594
(ix, next) <- peekTBQueue queue
612595
if next == item
596+
-- FIXME: why would we not call this? We saw the persistent queue reach
597+
-- capacity and writing blocked while nothing seemed to clear it.
613598
then readTBQueue queue $> Just ix
614599
else pure Nothing
615600
case popped of
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{-# LANGUAGE TemplateHaskell #-}
2+
3+
-- | Embedding and installation of 'etcd' binary. In a dedicated module as HLS
4+
-- tends to choke on $(embedExecutable "etcd").
5+
module Hydra.Network.EtcdBinary where
6+
7+
import Hydra.Prelude
8+
9+
import Data.Bits ((.|.))
10+
import Hydra.Network (WhichEtcd (..))
11+
import Hydra.Node.EmbedTH (embedExecutable)
12+
import System.Directory (createDirectoryIfMissing)
13+
import System.FilePath (takeDirectory, (</>))
14+
import System.Posix (ownerExecuteMode, ownerReadMode, ownerWriteMode, setFileMode)
15+
16+
-- | Return the path of the etcd binary. Will either install it first, or just
17+
-- assume there is one available on the system path.
18+
getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath
19+
getEtcdBinary _ SystemEtcd = pure "etcd"
20+
getEtcdBinary persistenceDir EmbeddedEtcd =
21+
let path = persistenceDir </> "bin" </> "etcd"
22+
in installEtcd path >> pure path
23+
24+
-- | Install the embedded 'etcd' binary to given file path.
25+
installEtcd :: FilePath -> IO ()
26+
installEtcd fp = do
27+
createDirectoryIfMissing True (takeDirectory fp)
28+
writeFileBS fp $(embedExecutable "etcd")
29+
setFileMode fp (ownerReadMode .|. ownerWriteMode .|. ownerExecuteMode)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
-- | Tests of the 'PersistentQueue'.
2+
module Hydra.PersistentQueueSpec where
3+
4+
import Hydra.Prelude
5+
import Test.Hydra.Prelude
6+
7+
import Hydra.Network.Etcd (newPersistentQueue, peekPersistentQueue, writePersistentQueue)
8+
import Test.QuickCheck (counterexample, generate, ioProperty)
9+
10+
spec :: Spec
11+
spec = do
12+
it "can be constructed" $ do
13+
capacity <- generate arbitrary
14+
withTempDir "persistent-queue" $ \dir -> do
15+
void $ newPersistentQueue @_ @Int dir capacity
16+
17+
prop "is persistent with capacity" $ \(items :: [Int]) -> do
18+
let capacity = fromIntegral $ length items
19+
counterexample ("capacity: " <> show capacity) $
20+
ioProperty $
21+
withTempDir "persistent-queue" $ \dir -> do
22+
q <- newPersistentQueue dir capacity
23+
shouldNotBlock_ $ mapM (writePersistentQueue q) items
24+
-- This is expected to block as we reached capacity
25+
_ <- timeout 0.01 (writePersistentQueue q 123)
26+
-- A new queue should be initialized with all the elements
27+
q2 <- shouldNotBlock $ newPersistentQueue @_ @Int dir capacity
28+
let expected = maybe 123 head (nonEmpty items)
29+
peekPersistentQueue q2 `shouldReturn` expected
30+
31+
shouldNotBlock :: HasCallStack => IO a -> IO a
32+
shouldNotBlock action = do
33+
timeout 0.1 action >>= \case
34+
Nothing -> failure "blocked unexpectedly"
35+
Just a -> pure a
36+
37+
shouldNotBlock_ :: HasCallStack => IO a -> IO ()
38+
shouldNotBlock_ = shouldNotBlock . void

0 commit comments

Comments
 (0)