20
20
-- only deliver messages that were not seen before. In case we are not connected
21
21
-- to our 'etcd' instance or not enough peers (= on a minority cluster), we
22
22
-- retry sending, but also store messages to broadcast in a 'PersistentQueue',
23
- -- which makes the node resilient against crashes while sending. TODO: Is this
24
- -- needed? performance limitation?
23
+ -- which makes the node resilient against crashes while sending.
25
24
--
26
25
-- Connectivity and compatibility with other nodes on the cluster is tracked
27
26
-- using the key-value service as well:
@@ -93,7 +92,6 @@ import Network.GRPC.Client (
93
92
)
94
93
import Network.GRPC.Client.StreamType.IO (biDiStreaming , nonStreaming )
95
94
import Network.GRPC.Common (GrpcError (.. ), GrpcException (.. ), HTTP2Settings (.. ), NextElem (.. ), def , defaultHTTP2Settings )
96
- import Network.GRPC.Common.NextElem (whileNext_ )
97
95
import Network.GRPC.Common.Protobuf (Proto (.. ), Protobuf , defMessage , (.~) )
98
96
import Network.GRPC.Etcd (
99
97
Compare'CompareResult (.. ),
@@ -102,6 +100,7 @@ import Network.GRPC.Etcd (
102
100
Lease ,
103
101
Watch ,
104
102
)
103
+ import Network.Socket (PortNumber )
105
104
import System.Directory (createDirectoryIfMissing , listDirectory , removeFile )
106
105
import System.Environment.Blank (getEnvironment )
107
106
import System.FilePath ((</>) )
@@ -175,7 +174,7 @@ withEtcdNetwork tracer protocolVersion config callback action = do
175
174
traceWith tracer Reconnecting
176
175
pure $ reconnectPolicy doneVar
177
176
178
- clientHost = Host {hostname = " 127.0.0.1" , port = clientPort }
177
+ clientHost = Host {hostname = " 127.0.0.1" , port = getClientPort config }
179
178
180
179
grpcServer =
181
180
ServerInsecure $
@@ -185,11 +184,6 @@ withEtcdNetwork tracer protocolVersion config callback action = do
185
184
, addressAuthority = Nothing
186
185
}
187
186
188
- -- NOTE: Offset client port by the same amount as configured 'port' is offset
189
- -- from the default '5001'. This will result in the default client port 2379
190
- -- be used by default still.
191
- clientPort = 2379 + port listen - 5001
192
-
193
187
traceStderr p NetworkCallback {onConnectivity} =
194
188
forever $ do
195
189
bs <- BS. hGetLine (getStderr p)
@@ -249,6 +243,14 @@ withEtcdNetwork tracer protocolVersion config callback action = do
249
243
250
244
NetworkConfiguration {persistenceDir, listen, advertise, peers, whichEtcd} = config
251
245
246
+ -- | Get the client port corresponding to a listen address.
247
+ --
248
+ -- The client port used by the started etcd port is offset by the same amount as
249
+ -- the listen address is offset by the default port 5001. This will result in
250
+ -- the default client port 2379 be used by default still.
251
+ getClientPort :: NetworkConfiguration -> PortNumber
252
+ getClientPort NetworkConfiguration {listen} = 2379 + port listen - 5001
253
+
252
254
-- | Check and write version on etcd cluster. This will retry until we are on a
253
255
-- majority cluster and succeed. If the version does not match a corresponding
254
256
-- 'Connectivity' message is sent via 'NetworkCallback'.
@@ -282,8 +284,7 @@ checkVersion tracer conn ourVersion NetworkCallback{onConnectivity} = do
282
284
Right theirVersion ->
283
285
unless (theirVersion == ourVersion) $
284
286
onConnectivity VersionMismatch {ourVersion, theirVersion = Just theirVersion}
285
- else
286
- traceWith tracer $ MatchingProtocolVersion {version = ourVersion}
287
+ else traceWith tracer $ MatchingProtocolVersion {version = ourVersion}
287
288
where
288
289
versionKey = " version"
289
290
@@ -361,11 +362,13 @@ waitMessages ::
361
362
NetworkCallback msg IO ->
362
363
IO ()
363
364
waitMessages tracer conn directory NetworkCallback {deliver} = do
364
- revision <- getLastKnownRevision directory
365
365
withGrpcContext " waitMessages" . forever $ do
366
366
-- NOTE: We have not observed the watch (subscription) fail even when peers
367
367
-- leave and we end up on a minority cluster.
368
368
biDiStreaming conn (rpc @ (Protobuf Watch " watch" )) $ \ send recv -> do
369
+ revision <- getLastKnownRevision directory
370
+ let startRevision = fromIntegral (revision + 1 )
371
+ traceWith tracer WatchMessagesStartRevision {startRevision}
369
372
-- NOTE: Request all keys starting with 'msg'. See also section KeyRanges
370
373
-- in https://etcd.io/docs/v3.5/learning/api/#key-value-api
371
374
let watchRequest =
@@ -374,34 +377,48 @@ waitMessages tracer conn directory NetworkCallback{deliver} = do
374
377
& # rangeEnd .~ " msh" -- NOTE: g+1 to query prefixes
375
378
& # startRevision .~ fromIntegral (revision + 1 )
376
379
send . NextElem $ defMessage & # createRequest .~ watchRequest
377
- whileNext_ recv process
380
+ loop send recv
378
381
-- Wait before re-trying
379
382
threadDelay 1
380
383
where
381
- process res = do
382
- let revision = fromIntegral $ res ^. # header . # revision
383
- putLastKnownRevision directory revision
384
- forM_ (res ^. # events) $ \ event -> do
385
- let value = event ^. # kv . # value
386
- case decodeFull' value of
387
- Left err ->
388
- traceWith
389
- tracer
390
- FailedToDecodeValue
391
- { key = decodeUtf8 $ event ^. # kv . # key
392
- , value = encodeBase16 value
393
- , reason = show err
394
- }
395
- Right msg -> deliver msg
384
+ loop send recv =
385
+ recv >>= \ case
386
+ NoNextElem -> pure ()
387
+ NextElem res ->
388
+ if res ^. # canceled
389
+ then do
390
+ let compactRevision = res ^. # compactRevision
391
+ traceWith tracer WatchMessagesFallbackTo {compactRevision}
392
+ putLastKnownRevision directory . fromIntegral $ (compactRevision - 1 ) `max` 0
393
+ -- Gracefully close watch stream
394
+ send NoNextElem
395
+ else do
396
+ let revision = res ^. # header . # revision
397
+ putLastKnownRevision directory . fromIntegral $ revision `max` 0
398
+ forM_ (res ^. # events) process
399
+ loop send recv
400
+
401
+ process event = do
402
+ let value = event ^. # kv . # value
403
+ case decodeFull' value of
404
+ Left err ->
405
+ traceWith
406
+ tracer
407
+ FailedToDecodeValue
408
+ { key = decodeUtf8 $ event ^. # kv . # key
409
+ , value = encodeBase16 value
410
+ , reason = show err
411
+ }
412
+ Right msg -> deliver msg
396
413
397
414
getLastKnownRevision :: MonadIO m => FilePath -> m Natural
398
415
getLastKnownRevision directory = do
399
416
liftIO $
400
417
try (decodeFileStrict' $ directory </> " last-known-revision" ) >>= \ case
401
418
Right rev -> do
402
- pure $ fromMaybe 1 rev
419
+ pure $ fromMaybe 0 rev
403
420
Left (e :: IOException )
404
- | isDoesNotExistError e -> pure 1
421
+ | isDoesNotExistError e -> pure 0
405
422
| otherwise -> do
406
423
fail $ " Failed to load last known revision: " <> show e
407
424
@@ -614,5 +631,7 @@ data EtcdLog
614
631
| LowLeaseTTL { ttlRemaining :: Int64 }
615
632
| NoKeepAliveResponse
616
633
| MatchingProtocolVersion { version :: ProtocolVersion }
634
+ | WatchMessagesStartRevision { startRevision :: Int64 }
635
+ | WatchMessagesFallbackTo { compactRevision :: Int64 }
617
636
deriving stock (Eq , Show , Generic )
618
637
deriving anyclass (ToJSON , FromJSON )
0 commit comments