@@ -450,23 +450,27 @@ pollConnectivity tracer conn advertise NetworkCallback{onConnectivity} = do
450
450
-- Write our alive key using lease
451
451
writeAlive leaseId
452
452
traceWith tracer CreatedLease {leaseId}
453
- withKeepAlive leaseId $ \ keepAlive ->
454
- forever $ do
455
- -- Keep our lease alive
456
- ttlRemaining <- keepAlive
457
- when (ttlRemaining < 1 ) $
458
- traceWith tracer LowLeaseTTL {ttlRemaining}
459
- -- Determine alive peers
460
- alive <- getAlive
461
- let othersAlive = alive \\ [advertise]
462
- seenAlive <- atomically $ swapTVar seenAliveVar othersAlive
463
- forM_ (othersAlive \\ seenAlive) $ onConnectivity . PeerConnected
464
- forM_ (seenAlive \\ othersAlive) $ onConnectivity . PeerDisconnected
465
- -- Wait roughly ttl / 2
466
- threadDelay (ttlRemaining / 2 )
453
+ withKeepAlive leaseId (aliveLoop seenAliveVar)
467
454
where
455
+ aliveLoop seenAliveVar keepAlive = do
456
+ -- Keep our lease alive
457
+ ttlRemaining <- keepAlive
458
+ if ttlRemaining <= 0
459
+ then
460
+ -- The keep alive did not work as no time to live remaining. Get a new lease instead
461
+ traceWith tracer LowLeaseTTL {ttlRemaining}
462
+ else do
463
+ -- Determine alive peers
464
+ alive <- getAlive
465
+ let othersAlive = alive \\ [advertise]
466
+ seenAlive <- atomically $ swapTVar seenAliveVar othersAlive
467
+ forM_ (othersAlive \\ seenAlive) $ onConnectivity . PeerConnected
468
+ forM_ (seenAlive \\ othersAlive) $ onConnectivity . PeerDisconnected
469
+ threadDelay 1
470
+ aliveLoop seenAliveVar keepAlive
471
+
468
472
onGrpcException seenAliveVar GrpcException {grpcError}
469
- | grpcError == GrpcUnavailable || grpcError == GrpcDeadlineExceeded = do
473
+ | grpcError `elem` [ GrpcUnavailable , GrpcDeadlineExceeded , GrpcCancelled ] = do
470
474
onConnectivity NetworkDisconnected
471
475
atomically $ writeTVar seenAliveVar []
472
476
threadDelay 1
@@ -483,7 +487,7 @@ pollConnectivity tracer conn advertise NetworkCallback{onConnectivity} = do
483
487
void . action $ do
484
488
send $ NextElem $ defMessage & # id .~ leaseId
485
489
recv >>= \ case
486
- NextElem res -> pure . fromIntegral $ res ^. # ttl
490
+ NextElem res -> pure $ res ^. # ttl
487
491
NoNextElem -> do
488
492
traceWith tracer NoKeepAliveResponse
489
493
pure 0
@@ -622,7 +626,7 @@ data EtcdLog
622
626
| FailedToDecodeLog { log :: Text , reason :: Text }
623
627
| FailedToDecodeValue { key :: Text , value :: Text , reason :: Text }
624
628
| CreatedLease { leaseId :: Int64 }
625
- | LowLeaseTTL { ttlRemaining :: DiffTime }
629
+ | LowLeaseTTL { ttlRemaining :: Int64 }
626
630
| NoKeepAliveResponse
627
631
| MatchingProtocolVersion { version :: ProtocolVersion }
628
632
deriving stock (Eq , Show , Generic )
0 commit comments