@@ -56,9 +56,10 @@ import Control.Concurrent.Class.MonadSTM (
56
56
writeTVar ,
57
57
)
58
58
import Control.Exception (IOException )
59
- import Control.Lens ((^.) , (^..) )
59
+ import Control.Lens ((^.) , (^..) , (^?) )
60
60
import Data.Aeson (decodeFileStrict' , encodeFile )
61
61
import Data.Aeson qualified as Aeson
62
+ import Data.Aeson.Lens qualified as Aeson
62
63
import Data.Aeson.Types (Value )
63
64
import Data.Bits ((.|.) )
64
65
import Data.ByteString qualified as BS
@@ -138,12 +139,12 @@ withEtcdNetwork tracer protocolVersion config callback action = do
138
139
envVars <- Map. fromList <$> getEnvironment
139
140
withProcessInterrupt (etcdCmd etcdBinPath envVars) $ \ p -> do
140
141
race_ (waitExitCode p >>= \ ec -> fail $ " Sub-process etcd exited with: " <> show ec) $ do
141
- race_ (traceStderr p) $ do
142
+ race_ (traceStderr p callback ) $ do
142
143
-- XXX: cleanup reconnecting through policy if other threads fail
143
144
doneVar <- newTVarIO False
144
145
-- NOTE: The connection to the server is set up asynchronously; the
145
146
-- first rpc call will block until the connection has been established.
146
- withConnection (connParams doneVar) grpcServer $ \ conn ->
147
+ withConnection (connParams doneVar) grpcServer $ \ conn -> do
147
148
-- REVIEW: checkVersion blocks if used on main thread - why?
148
149
withAsync (checkVersion tracer conn protocolVersion callback) $ \ _ -> do
149
150
race_ (pollConnectivity tracer conn advertise callback) $
@@ -188,12 +189,22 @@ withEtcdNetwork tracer protocolVersion config callback action = do
188
189
-- be used by default still.
189
190
clientPort = 2379 + port listen - 5001
190
191
191
- traceStderr p =
192
+ traceStderr p NetworkCallback {onConnectivity} =
192
193
forever $ do
193
194
bs <- BS. hGetLine (getStderr p)
194
195
case Aeson. eitherDecodeStrict bs of
195
196
Left err -> traceWith tracer FailedToDecodeLog {log = decodeUtf8 bs, reason = show err}
196
- Right v -> traceWith tracer $ EtcdLog {etcd = v}
197
+ Right v -> do
198
+ let expectedClusterMismatch = do
199
+ level' <- bs ^? Aeson. key " level" . Aeson. nonNull
200
+ msg' <- bs ^? Aeson. key " msg" . Aeson. nonNull
201
+ localClusterId <- bs ^? Aeson. key " local-member-cluster-id" . Aeson. nonNull
202
+ remoteClusterId <- bs ^? Aeson. key " remote-peer-cluster-id" . Aeson. nonNull
203
+ pure (level', msg', localClusterId, remoteClusterId)
204
+ case expectedClusterMismatch of
205
+ Just (Aeson. String " error" , Aeson. String " request sent was ignored due to cluster ID mismatch" , Aeson. String localClusterID, Aeson. String remotePeerClusterID) ->
206
+ onConnectivity ClusterIDMismatch {localClusterID, remotePeerClusterID}
207
+ _ -> traceWith tracer $ EtcdLog {etcd = v}
197
208
198
209
-- XXX: Could use TLS to secure peer connections
199
210
-- XXX: Could use discovery to simplify configuration
0 commit comments