Skip to content

Commit b01dbb0

Browse files
committed
Fix for broadcast deadlock (#2171)
Continuing #2168 but now the tests and such will run. This work is by @jmagan but I had to make my own PR to satisfy our rules around PR execution 🥲.
1 parent e1f9505 commit b01dbb0

File tree

3 files changed

+66
-45
lines changed

3 files changed

+66
-45
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ changes.
3232
- Handle failing lease keep alive in network component and avoid bursts in
3333
heartbeating.
3434

35+
- Fix for blocking bug when broadcasting messages via etcd. See:
36+
https://github.com/cardano-scaling/hydra/issues/2167. This is not a full fix
37+
but is enough to resolve the problem until we can identify the central cause
38+
of the issue.
39+
3540
## [0.22.3] - 2025-07-21
3641

3742
* Change behavior of `Hydra.Network.Etcd` to fallback to earliest possible

hydra-node/src/Hydra/Network/Etcd.hs

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ import Hydra.Network (
7777
import Hydra.Network.EtcdBinary (getEtcdBinary)
7878
import Network.GRPC.Client (
7979
Address (..),
80-
CallParams (..),
8180
ConnParams (..),
8281
Connection,
8382
ReconnectPolicy (..),
@@ -87,7 +86,6 @@ import Network.GRPC.Client (
8786
TimeoutUnit (..),
8887
TimeoutValue (..),
8988
rpc,
90-
rpcWith,
9189
withConnection,
9290
)
9391
import Network.GRPC.Client.StreamType.IO (biDiStreaming, nonStreaming)
@@ -120,7 +118,6 @@ import System.Process.Typed (
120118
unsafeProcessHandle,
121119
waitExitCode,
122120
)
123-
import UnliftIO (readTVarIO)
124121

125122
-- | Concrete network component that broadcasts messages to an etcd cluster and
126123
-- listens for incoming messages.
@@ -140,50 +137,21 @@ withEtcdNetwork tracer protocolVersion config callback action = do
140137
withProcessInterrupt (etcdCmd etcdBinPath envVars) $ \p -> do
141138
race_ (waitExitCode p >>= \ec -> fail $ "Sub-process etcd exited with: " <> show ec) $ do
142139
race_ (traceStderr p callback) $ do
143-
-- XXX: cleanup reconnecting through policy if other threads fail
144-
doneVar <- newTVarIO False
145140
-- NOTE: The connection to the server is set up asynchronously; the
146141
-- first rpc call will block until the connection has been established.
147-
withConnection (connParams doneVar) grpcServer $ \conn -> do
142+
withConnection (connParams tracer Nothing) (grpcServer config) $ \conn -> do
148143
-- REVIEW: checkVersion blocks if used on main thread - why?
149144
withAsync (checkVersion tracer conn protocolVersion callback) $ \_ -> do
150145
race_ (pollConnectivity tracer conn advertise callback) $
151146
race_ (waitMessages tracer conn persistenceDir callback) $ do
152147
queue <- newPersistentQueue (persistenceDir </> "pending-broadcast") 100
153-
race_ (broadcastMessages tracer conn advertise queue) $ do
148+
race_ (broadcastMessages tracer config advertise queue) $ do
154149
action
155150
Network
156151
{ broadcast = writePersistentQueue queue
157152
}
158-
atomically (writeTVar doneVar True)
159153
where
160-
connParams doneVar =
161-
def
162-
{ connReconnectPolicy = reconnectPolicy doneVar
163-
, -- NOTE: Not rate limit pings to our trusted, local etcd node. See
164-
-- comment on 'http2OverridePingRateLimit'.
165-
connHTTP2Settings = defaultHTTP2Settings{http2OverridePingRateLimit = Just maxBound}
166-
}
167-
168-
reconnectPolicy doneVar = ReconnectAfter ReconnectToOriginal $ do
169-
done <- readTVarIO doneVar
170-
if done
171-
then pure DontReconnect
172-
else do
173-
threadDelay 1
174-
traceWith tracer Reconnecting
175-
pure $ reconnectPolicy doneVar
176-
177154
clientHost = Host{hostname = "127.0.0.1", port = getClientPort config}
178-
179-
grpcServer =
180-
ServerInsecure $
181-
Address
182-
{ addressHost = toString $ hostname clientHost
183-
, addressPort = port clientHost
184-
, addressAuthority = Nothing
185-
}
186-
187155
traceStderr p NetworkCallback{onConnectivity} =
188156
forever $ do
189157
bs <- BS.hGetLine (getStderr p)
@@ -243,6 +211,32 @@ withEtcdNetwork tracer protocolVersion config callback action = do
243211

244212
NetworkConfiguration{persistenceDir, listen, advertise, peers, whichEtcd} = config
245213

214+
connParams :: Tracer IO EtcdLog -> Maybe Timeout -> ConnParams
215+
connParams tracer to =
216+
def
217+
{ connReconnectPolicy = reconnectPolicy
218+
, -- NOTE: Not rate limit pings to our trusted, local etcd node. See
219+
-- comment on 'http2OverridePingRateLimit'.
220+
connHTTP2Settings = defaultHTTP2Settings{http2OverridePingRateLimit = Just maxBound}
221+
, connDefaultTimeout = to
222+
}
223+
where
224+
reconnectPolicy = ReconnectAfter ReconnectToOriginal $ do
225+
threadDelay 1
226+
traceWith tracer Reconnecting
227+
pure reconnectPolicy
228+
229+
grpcServer :: NetworkConfiguration -> Server
230+
grpcServer config =
231+
ServerInsecure $
232+
Address
233+
{ addressHost = toString $ hostname clientHost
234+
, addressPort = port clientHost
235+
, addressAuthority = Nothing
236+
}
237+
where
238+
clientHost = Host{hostname = "127.0.0.1", port = getClientPort config}
239+
246240
-- | Get the client port corresponding to a listen address.
247241
--
248242
-- The client port used by the started etcd port is offset by the same amount as
@@ -314,15 +308,15 @@ checkVersion tracer conn ourVersion NetworkCallback{onConnectivity} = do
314308
broadcastMessages ::
315309
(ToCBOR msg, Eq msg) =>
316310
Tracer IO EtcdLog ->
317-
Connection ->
311+
NetworkConfiguration ->
318312
-- | Used to identify sender.
319313
Host ->
320314
PersistentQueue IO msg ->
321315
IO ()
322-
broadcastMessages tracer conn ourHost queue =
316+
broadcastMessages tracer config ourHost queue =
323317
withGrpcContext "broadcastMessages" . forever $ do
324318
msg <- peekPersistentQueue queue
325-
(putMessage conn ourHost msg >> popPersistentQueue queue msg)
319+
(putMessage tracer config ourHost msg >> popPersistentQueue queue msg)
326320
`catch` \case
327321
GrpcException{grpcError, grpcErrorMessage}
328322
| grpcError == GrpcUnavailable || grpcError == GrpcDeadlineExceeded -> do
@@ -333,19 +327,18 @@ broadcastMessages tracer conn ourHost queue =
333327
-- | Broadcast a message to the etcd cluster.
334328
putMessage ::
335329
ToCBOR msg =>
336-
Connection ->
330+
Tracer IO EtcdLog ->
331+
NetworkConfiguration ->
337332
-- | Used to identify sender.
338333
Host ->
339334
msg ->
340335
IO ()
341-
putMessage conn ourHost msg =
342-
void $ nonStreaming conn (rpcWith @(Protobuf KV "put") callParams) req
336+
putMessage tracer config ourHost msg = do
337+
-- XXX: Here we open a new connection _for every message_! This is
338+
-- effectively a work-around for https://github.com/cardano-scaling/hydra/issues/2167.
339+
withConnection (connParams tracer (Just . Timeout Second $ TimeoutValue 3)) (grpcServer config) $ \conn -> do
340+
void $ nonStreaming conn (rpc @(Protobuf KV "put")) req
343341
where
344-
-- NOTE: Timeout puts after 3 seconds. This is not tested, but we saw the
345-
-- 'pending-broadcast' queue fill up and suspect that 'put' requests in
346-
-- 'broadcastMessages' were just not served and stay pending forever.
347-
callParams = def{callTimeout = Just . Timeout Second $ TimeoutValue 3}
348-
349342
req =
350343
defMessage
351344
& #key .~ key

hydra-node/test/Hydra/NetworkSpec.hs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,29 @@ spec = do
6363
broadcast n ("asdf" :: Text)
6464
waitNext `shouldReturn` "asdf"
6565

66+
-- Note: This test is disabled as it takes took long; but it is
67+
-- important to keep around. Successfully completion of this test looks
68+
-- like either a "mvcc database size exceeded" error; or no error at
69+
-- all. Failures looks like complete blocking
70+
-- XXX: Maybe run this one nightly; when we start doing nightly tests.
71+
xit "broadcasts 100KiB messages 1M times" $ \tracer ->
72+
withTempDir "test-etcd" $ \tmp -> do
73+
putStrLn $ "Folder " ++ show tmp
74+
PeerConfig2{aliceConfig, bobConfig} <- setup2Peers tmp
75+
(recordReceived, waitNext, _) <- newRecordingCallback
76+
-- Create a 100KiB message (100 * 1024 characters)
77+
let largeMessage = toText $ replicate (100 * 1024) 'a'
78+
withEtcdNetwork @Text tracer v1 aliceConfig recordReceived $ \n1 -> do
79+
withEtcdNetwork @Text tracer v1 bobConfig noopCallback $ \_ -> do
80+
forM_ [1 :: Integer .. 1000000] $ \i -> do
81+
let msgWithId = largeMessage <> " - Message #" <> show i
82+
when (i `mod` 10000 == 0) $
83+
putStrLn $
84+
"Broadcasting 100KiB message #" <> show i <> " (size: " <> show (length (toString msgWithId)) <> " chars)"
85+
broadcast n1 msgWithId
86+
_ <- waitNext
87+
threadDelay 0.02
88+
6689
it "broadcasts messages to single connected peer" $ \tracer -> do
6790
withTempDir "test-etcd" $ \tmp -> do
6891
failAfter 5 $ do

0 commit comments

Comments
 (0)