diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index 6d10774..1a4330a 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -49,7 +49,7 @@ package final class Connection: Sendable { /// The connect attempt succeeded and the connection is ready to use. case connectSucceeded /// The connect attempt failed. - case connectFailed(any Error) + case connectFailed(RPCError) /// The connection received a GOAWAY and will close soon. No new streams /// should be opened on this connection. case goingAway(HTTP2ErrorCode, String) @@ -68,7 +68,7 @@ package final class Connection: Sendable { /// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame). case remote /// Closed because the connection encountered an unexpected error. - case error(any Error, wasIdle: Bool) + case error(RPCError, wasIdle: Bool) } /// Inputs to the 'run' method. @@ -127,9 +127,20 @@ package final class Connection: Sendable { /// This function returns when the connection has closed. You can observe connection events /// by consuming the ``events`` sequence. package func run() async { - let connectResult = await Result { - try await self.http2Connector.establishConnection(to: self.address) + func establishConnectionOrThrow() async throws(RPCError) -> HTTP2Connection { + do { + return try await self.http2Connector.establishConnection(to: self.address) + } catch let error as RPCError { + throw error + } catch { + throw RPCError( + code: .unavailable, + message: "Could not establish a connection to \(self.address).", + cause: error + ) + } } + let connectResult = await Result(catching: establishConnectionOrThrow) switch connectResult { case .success(let connected): @@ -236,6 +247,7 @@ package final class Connection: Sendable { // This state is tracked here so that if the connection events sequence finishes and the // connection never became ready then the connection can report that the connect failed. var isReady = false + var unexpectedCloseError: (any Error)? func makeNeverReadyError(cause: (any Error)?) -> RPCError { return RPCError( @@ -265,10 +277,15 @@ package final class Connection: Sendable { // The connection will close at some point soon, yield a notification for this // because the close might not be imminent and this could result in address resolution. self.event.continuation.yield(.goingAway(errorCode, reason)) - case .idle, .keepaliveExpired, .initiatedLocally, .unexpected: + case .idle, .keepaliveExpired, .initiatedLocally: // The connection will be closed imminently in these cases there's no need to do // anything. () + case .unexpected(let error, _): + // The connection will be closed imminently in this case. + // We'll store the error that caused the unexpected closure so we + // can surface it. + unexpectedCloseError = error } // Take the reason with the highest precedence. A GOAWAY may be superseded by user @@ -318,7 +335,7 @@ package final class Connection: Sendable { finalEvent = .closed(connectionCloseReason) } else { // The connection never became ready, this therefore counts as a failed connect attempt. - finalEvent = .connectFailed(makeNeverReadyError(cause: nil)) + finalEvent = .connectFailed(makeNeverReadyError(cause: unexpectedCloseError)) } // The connection events sequence has finished: the connection is now closed. diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectivityState.swift b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectivityState.swift index 6f4b000..8ef3a89 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectivityState.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectivityState.swift @@ -14,6 +14,8 @@ * limitations under the License. */ +package import GRPCCore + package enum ConnectivityState: Sendable, Hashable { /// This channel isn't trying to create a connection because of a lack of new or pending RPCs. /// @@ -34,7 +36,7 @@ package enum ConnectivityState: Sendable, Hashable { /// establish a connection again. Since retries are done with exponential backoff, channels that /// fail to connect will start out spending very little time in this state but as the attempts /// fail repeatedly, the channel will spend increasingly large amounts of time in this state. - case transientFailure + case transientFailure(cause: RPCError) /// This channel has started shutting down. Any new RPCs should fail immediately. Pending RPCs /// may continue running until the application cancels them. Channels may enter this state either diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift b/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift index 9f3c1fe..8bd5e46 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift @@ -290,7 +290,7 @@ extension GRPCChannel { } case .failRPC: - return .stopTrying(RPCError(code: .unavailable, message: "channel isn't ready")) + return .stopTrying(RPCError(code: .unavailable, message: "Channel isn't ready.")) } } @@ -300,7 +300,7 @@ extension GRPCChannel { loadBalancer: LoadBalancer ) async -> MakeStreamResult { guard let subchannel = loadBalancer.pickSubchannel() else { - return .tryAgain(RPCError(code: .unavailable, message: "channel isn't ready")) + return .tryAgain(RPCError(code: .unavailable, message: "Channel isn't ready.")) } let methodConfig = self.config(forMethod: descriptor) @@ -758,14 +758,24 @@ extension GRPCChannel.StateMachine { result: .success(state.current) ) - case .transientFailure, .shutdown: // shutdown includes shutting down + case .transientFailure(let cause): // Current load-balancer failed. Remove all the 'fast-failing' continuations in the // queue, these are RPCs which set the 'wait for ready' option to false. The rest of // the entries in the queue will wait for a load-balancer to become ready. let continuations = state.queue.removeFastFailingEntries() actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations( continuations: continuations, - result: .failure(RPCError(code: .unavailable, message: "channel isn't ready")) + result: .failure(cause) + ) + + case .shutdown: // shutdown includes shutting down + // Current load-balancer failed. Remove all the 'fast-failing' continuations in the + // queue, these are RPCs which set the 'wait for ready' option to false. The rest of + // the entries in the queue will wait for a load-balancer to become ready. + let continuations = state.queue.removeFastFailingEntries() + actions.resumeContinuations = ConnectivityStateChangeActions.ResumableContinuations( + continuations: continuations, + result: .failure(RPCError(code: .unavailable, message: "Channel isn't ready.")) ) case .idle, .connecting: diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/RoundRobinLoadBalancer.swift b/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/RoundRobinLoadBalancer.swift index f0185ef..abdacb1 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/RoundRobinLoadBalancer.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/RoundRobinLoadBalancer.swift @@ -530,7 +530,7 @@ extension RoundRobinLoadBalancer { // The transition from transient failure to connecting is ignored. // // See: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md - if self.state == .transientFailure, newState == .connecting { + if case .transientFailure = self.state, newState == .connecting { return false } @@ -750,12 +750,26 @@ extension ConnectivityState { return .idle } - // Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state - // is TRANSIENT_FAILURE. - if states.allSatisfy({ $0 == .transientFailure }) { - return .transientFailure + // Otherwise, if all subchannels are in state TRANSIENT_FAILURE, the channel's state is TRANSIENT_FAILURE. + var cause: RPCError? + for state in states { + switch state { + case .transientFailure(let error): + // Pick one of the errors to surface, as we can't surface all of them. + cause = error + case .shutdown: + return .shutdown + case .idle, .connecting, .ready: + fatalError("Unreachable state: these should have been handled above.") + } } - return .shutdown + if let cause { + return .transientFailure(cause: cause) + } else { + // We can only reach this point without a `cause` if `states` was empty. + // Fall back to shutdown: we have nothing better to do. + return .shutdown + } } } diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/Subchannel.swift b/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/Subchannel.swift index a9947cb..b2be017 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/Subchannel.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/Subchannel.swift @@ -256,8 +256,8 @@ extension Subchannel { switch event { case .connectSucceeded: self.handleConnectSucceededEvent() - case .connectFailed: - self.handleConnectFailedEvent(in: &group) + case .connectFailed(let cause): + self.handleConnectFailedEvent(in: &group, error: cause) case .goingAway: self.handleGoingAwayEvent() case .closed(let reason): @@ -282,7 +282,7 @@ extension Subchannel { } } - private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup) { + private func handleConnectFailedEvent(in group: inout DiscardingTaskGroup, error: RPCError) { let onConnectFailed = self.state.withLock { $0.connectFailed(connector: self.connector) } switch onConnectFailed { case .connect(let connection): @@ -291,7 +291,11 @@ extension Subchannel { case .backoff(let duration): // All addresses have been tried, backoff for some time. - self.event.continuation.yield(.connectivityStateChanged(.transientFailure)) + self.event.continuation.yield( + .connectivityStateChanged( + .transientFailure(cause: error) + ) + ) group.addTask { do { try await Task.sleep(for: duration) @@ -334,9 +338,9 @@ extension Subchannel { case .emitIdle: self.event.continuation.yield(.connectivityStateChanged(.idle)) - case .emitTransientFailureAndReconnect: + case .emitTransientFailureAndReconnect(let cause): // Unclean closes trigger a transient failure state change and a name resolution. - self.event.continuation.yield(.connectivityStateChanged(.transientFailure)) + self.event.continuation.yield(.connectivityStateChanged(.transientFailure(cause: cause))) self.event.continuation.yield(.requiresNameResolution) // Attempt to reconnect. self.handleConnectInput(in: &group) @@ -632,7 +636,7 @@ extension Subchannel { enum OnClosed { case nothing case emitIdle - case emitTransientFailureAndReconnect + case emitTransientFailureAndReconnect(cause: RPCError) case finish(emitShutdown: Bool) } @@ -646,9 +650,21 @@ extension Subchannel { self = .notConnected(NotConnected(from: state)) onClosed = .emitIdle - case .keepaliveTimeout, .error(_, wasIdle: false): + case .keepaliveTimeout: + self = .notConnected(NotConnected(from: state)) + onClosed = .emitTransientFailureAndReconnect( + cause: RPCError( + code: .unavailable, + message: """ + The connection became unresponsive and was closed because the \ + keepalive timeout fired. + """ + ) + ) + + case .error(let error, wasIdle: false): self = .notConnected(NotConnected(from: state)) - onClosed = .emitTransientFailureAndReconnect + onClosed = .emitTransientFailureAndReconnect(cause: error) case .initiatedLocally: // Should be in the 'shuttingDown' state. diff --git a/Sources/GRPCNIOTransportCore/Internal/Result+Catching.swift b/Sources/GRPCNIOTransportCore/Internal/Result+Catching.swift index 5c5060e..6b5bbf2 100644 --- a/Sources/GRPCNIOTransportCore/Internal/Result+Catching.swift +++ b/Sources/GRPCNIOTransportCore/Internal/Result+Catching.swift @@ -14,12 +14,12 @@ * limitations under the License. */ -extension Result where Failure == any Error { +extension Result { /// Like `Result(catching:)`, but `async`. /// /// - Parameter body: An `async` closure to catch the result of. @inlinable - init(catching body: () async throws -> Success) async { + init(catching body: () async throws(Failure) -> Success) async { do { self = .success(try await body()) } catch { diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Connection+Equatable.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Connection+Equatable.swift index 393e2f8..8d09c60 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Connection+Equatable.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Connection+Equatable.swift @@ -54,11 +54,7 @@ extension Connection.CloseReason { return true case (.error(let lhsError, let lhsStreams), .error(let rhsError, let rhsStreams)): - if let lhs = lhsError as? RPCError, let rhs = rhsError as? RPCError { - return lhs == rhs && lhsStreams == rhsStreams - } else { - return lhsStreams == rhsStreams - } + return lhsError == rhsError && lhsStreams == rhsStreams default: return false diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift index 5ef50ca..069a03a 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift @@ -161,7 +161,15 @@ final class SubchannelTests: XCTestCase { [ .connectivityStateChanged(.idle), .connectivityStateChanged(.connecting), - .connectivityStateChanged(.transientFailure), + .connectivityStateChanged( + .transientFailure( + cause: RPCError( + code: .unavailable, + message: + "Could not establish a connection to [unix]test-connect-eventually-succeeds." + ) + ) + ), .connectivityStateChanged(.connecting), ] ) @@ -440,7 +448,14 @@ final class SubchannelTests: XCTestCase { .connectivityStateChanged(.idle), .connectivityStateChanged(.connecting), .connectivityStateChanged(.ready), - .connectivityStateChanged(.transientFailure), + .connectivityStateChanged( + .transientFailure( + cause: RPCError( + code: .unavailable, + message: "The TCP connection was dropped unexpectedly." + ) + ) + ), .requiresNameResolution, .connectivityStateChanged(.connecting), .connectivityStateChanged(.ready),