From d37428024cae0165542e0b73891249eb14824d6f Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 7 May 2025 17:19:36 +0100 Subject: [PATCH 1/3] Allow clients to set a max age for connections Motivation: Some load balancers reject new streams when a client certificate has expired but allow existing ones to continue. This is problematic because there's no signal that the connection can't be used anymore so new RPCs will continue to fail. Modifications: Allow clients to configure a max age for connections, after which time the connection will shutdown gracefully. This allows new connections to be established. Result: Client connections can be configured to age out --- Sources/GRPC/ClientConnection.swift | 10 + .../ConnectionManagerChannelProvider.swift | 8 + .../GRPC/ConnectionPool/GRPCChannelPool.swift | 8 + .../GRPC/ConnectionPool/PooledChannel.swift | 3 + Sources/GRPC/GRPCIdleHandler.swift | 35 ++- .../GRPC/GRPCIdleHandlerStateMachine.swift | 7 + Tests/GRPCTests/ConnectionManagerTests.swift | 17 ++ .../ConnectionPool/ConnectionPoolTests.swift | 1 + Tests/GRPCTests/MaxAgeTests.swift | 241 ++++++++++++++++++ 9 files changed, 324 insertions(+), 6 deletions(-) create mode 100644 Tests/GRPCTests/MaxAgeTests.swift diff --git a/Sources/GRPC/ClientConnection.swift b/Sources/GRPC/ClientConnection.swift index 0749922d1..475cea840 100644 --- a/Sources/GRPC/ClientConnection.swift +++ b/Sources/GRPC/ClientConnection.swift @@ -423,6 +423,14 @@ extension ClientConnection { /// Defaults to 30 minutes. public var connectionIdleTimeout: TimeAmount = .minutes(30) + /// The maximum allowed age of a connection. + /// + /// If set, no new RPCs will be started on the connection after the connection has been opened + /// for this period of time. Existing RPCs will be allowed to continue and the connection will + /// close once all RPCs on the connection have finished. If this isn't set then connections have + /// no limit on their lifetime. + public var connectionMaxAge: TimeAmount? = nil + /// The behavior used to determine when an RPC should start. That is, whether it should wait for /// an active connection or fail quickly if no connection is currently available. /// @@ -635,6 +643,7 @@ extension ChannelPipeline.SynchronousOperations { connectionManager: ConnectionManager, connectionKeepalive: ClientConnectionKeepalive, connectionIdleTimeout: TimeAmount, + connectionMaxAge: TimeAmount?, httpTargetWindowSize: Int, httpMaxFrameSize: Int, httpMaxResetStreams: Int, @@ -672,6 +681,7 @@ extension ChannelPipeline.SynchronousOperations { connectionManager: connectionManager, multiplexer: h2Multiplexer, idleTimeout: connectionIdleTimeout, + maxAge: connectionMaxAge, keepalive: connectionKeepalive, logger: logger ) diff --git a/Sources/GRPC/ConnectionManagerChannelProvider.swift b/Sources/GRPC/ConnectionManagerChannelProvider.swift index 3a23e85c2..6bae5c516 100644 --- a/Sources/GRPC/ConnectionManagerChannelProvider.swift +++ b/Sources/GRPC/ConnectionManagerChannelProvider.swift @@ -60,6 +60,8 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { internal var connectionKeepalive: ClientConnectionKeepalive @usableFromInline internal var connectionIdleTimeout: TimeAmount + @usableFromInline + internal var connectionMaxAge: TimeAmount? @usableFromInline internal var tlsMode: TLSMode @@ -100,6 +102,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: ConnectionTarget, connectionKeepalive: ClientConnectionKeepalive, connectionIdleTimeout: TimeAmount, + connectionMaxAge: TimeAmount?, tlsMode: TLSMode, tlsConfiguration: GRPCTLSConfiguration?, httpTargetWindowSize: Int, @@ -113,6 +116,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: connectionTarget, connectionKeepalive: connectionKeepalive, connectionIdleTimeout: connectionIdleTimeout, + connectionMaxAge: connectionMaxAge, tlsMode: tlsMode, tlsConfiguration: tlsConfiguration, httpTargetWindowSize: httpTargetWindowSize, @@ -131,6 +135,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: ConnectionTarget, connectionKeepalive: ClientConnectionKeepalive, connectionIdleTimeout: TimeAmount, + connectionMaxAge: TimeAmount?, tlsMode: TLSMode, tlsConfiguration: GRPCTLSConfiguration?, httpTargetWindowSize: Int, @@ -142,6 +147,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { self.connectionTarget = connectionTarget self.connectionKeepalive = connectionKeepalive self.connectionIdleTimeout = connectionIdleTimeout + self.connectionMaxAge = connectionMaxAge self.tlsMode = tlsMode self.tlsConfiguration = tlsConfiguration @@ -182,6 +188,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: configuration.target, connectionKeepalive: configuration.connectionKeepalive, connectionIdleTimeout: configuration.connectionIdleTimeout, + connectionMaxAge: configuration.connectionMaxAge, tlsMode: tlsMode, tlsConfiguration: configuration.tlsConfiguration, httpTargetWindowSize: configuration.httpTargetWindowSize, @@ -264,6 +271,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionManager: connectionManager, connectionKeepalive: self.connectionKeepalive, connectionIdleTimeout: self.connectionIdleTimeout, + connectionMaxAge: self.connectionMaxAge, httpTargetWindowSize: self.httpTargetWindowSize, httpMaxFrameSize: self.httpMaxFrameSize, httpMaxResetStreams: self.httpMaxResetStreams, diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index e84b903db..53f966d8f 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -156,6 +156,14 @@ extension GRPCChannelPool { /// If a connection becomes idle, starting a new RPC will automatically create a new connection. public var idleTimeout = TimeAmount.minutes(30) + /// The maximum allowed age of a connection. + /// + /// If set, no new RPCs will be started on the connection after the connection has been opened + /// for this period of time. Existing RPCs will be allowed to continue and the connection will + /// close once all RPCs on the connection have finished. If this isn't set then connections have + /// no limit on their lifetime. + public var maxConnectionAge: TimeAmount? = nil + /// The connection keepalive configuration. public var keepalive = ClientConnectionKeepalive() diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index 0c7b95fcd..9c4546dc1 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -86,6 +86,7 @@ internal final class PooledChannel: GRPCChannel { connectionTarget: configuration.target, connectionKeepalive: configuration.keepalive, connectionIdleTimeout: configuration.idleTimeout, + connectionMaxAge: configuration.maxConnectionAge, tlsMode: tlsMode, tlsConfiguration: configuration.transportSecurity.tlsConfiguration, httpTargetWindowSize: configuration.http2.targetWindowSize, @@ -100,6 +101,7 @@ internal final class PooledChannel: GRPCChannel { connectionTarget: configuration.target, connectionKeepalive: configuration.keepalive, connectionIdleTimeout: configuration.idleTimeout, + connectionMaxAge: configuration.maxConnectionAge, tlsMode: tlsMode, tlsConfiguration: configuration.transportSecurity.tlsConfiguration, httpTargetWindowSize: configuration.http2.targetWindowSize, @@ -114,6 +116,7 @@ internal final class PooledChannel: GRPCChannel { connectionTarget: configuration.target, connectionKeepalive: configuration.keepalive, connectionIdleTimeout: configuration.idleTimeout, + connectionMaxAge: configuration.maxAge, tlsMode: tlsMode, tlsConfiguration: configuration.transportSecurity.tlsConfiguration, httpTargetWindowSize: configuration.http2.targetWindowSize, diff --git a/Sources/GRPC/GRPCIdleHandler.swift b/Sources/GRPC/GRPCIdleHandler.swift index 0f9492163..4d86f226d 100644 --- a/Sources/GRPC/GRPCIdleHandler.swift +++ b/Sources/GRPC/GRPCIdleHandler.swift @@ -27,11 +27,18 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { /// If nil, then we shouldn't schedule idle tasks. private let idleTimeout: TimeAmount? + /// The maximum amount of time the connection is allowed to live before quiescing. + private let maxAge: TimeAmount? + /// The ping handler. private var pingHandler: PingHandler + /// The scheduled task which will close the connection gently after the max connection age + /// has been reached. + private var scheduledMaxAgeClose: Scheduled? + /// The scheduled task which will close the connection after the keep-alive timeout has expired. - private var scheduledClose: Scheduled? + private var scheduledKeepAliveClose: Scheduled? /// The scheduled task which will ping. private var scheduledPing: RepeatedTask? @@ -75,6 +82,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { connectionManager: ConnectionManager, multiplexer: HTTP2StreamMultiplexer, idleTimeout: TimeAmount, + maxAge: TimeAmount?, keepalive configuration: ClientConnectionKeepalive, logger: Logger ) { @@ -95,6 +103,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData ) self.creationTime = .now() + self.maxAge = maxAge } init( @@ -116,6 +125,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { maximumPingStrikes: configuration.maximumPingStrikes ) self.creationTime = .now() + self.maxAge = nil } private func perform(operations: GRPCIdleHandlerStateMachine.Operations) { @@ -218,8 +228,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { ) case .cancelScheduledTimeout: - self.scheduledClose?.cancel() - self.scheduledClose = nil + self.scheduledKeepAliveClose?.cancel() + self.scheduledKeepAliveClose = nil case let .schedulePing(delay, timeout): self.schedulePing(in: delay, timeout: timeout) @@ -267,7 +277,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { } private func scheduleClose(in timeout: TimeAmount) { - self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) { + self.scheduledKeepAliveClose = self.context?.eventLoop.scheduleTask(in: timeout) { self.stateMachine.logger.debug("keepalive timer expired") self.perform(operations: self.stateMachine.shutdownNow()) } @@ -334,6 +344,16 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { remote: context.remoteAddress ) + // If a max age has been set then start a timer. This will only be cancelled when it fires or when + // the channel eventually becomes inactive. + if let maxAge = self.maxAge { + assert(self.scheduledMaxAgeClose == nil) + self.scheduledMaxAgeClose = context.eventLoop.scheduleTask(in: maxAge) { + let operations = self.stateMachine.reachedMaxAge() + self.perform(operations: operations) + } + } + // No state machine action here. switch self.mode { case let .client(connectionManager, multiplexer): @@ -341,15 +361,18 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { case .server: () } + context.fireChannelActive() } func channelInactive(context: ChannelHandlerContext) { self.perform(operations: self.stateMachine.channelInactive()) self.scheduledPing?.cancel() - self.scheduledClose?.cancel() + self.scheduledKeepAliveClose?.cancel() + self.scheduledMaxAgeClose?.cancel() self.scheduledPing = nil - self.scheduledClose = nil + self.scheduledKeepAliveClose = nil + self.scheduledMaxAgeClose = nil context.fireChannelInactive() } diff --git a/Sources/GRPC/GRPCIdleHandlerStateMachine.swift b/Sources/GRPC/GRPCIdleHandlerStateMachine.swift index 5fbbe5c71..fc1134591 100644 --- a/Sources/GRPC/GRPCIdleHandlerStateMachine.swift +++ b/Sources/GRPC/GRPCIdleHandlerStateMachine.swift @@ -465,6 +465,13 @@ struct GRPCIdleHandlerStateMachine { return operations } + /// The connection has reached it's max allowable age. Let existing RPCs continue, but don't + /// allow any new ones. + mutating func reachedMaxAge() -> Operations { + // Treat this as if the other side sent us a GOAWAY: gently shutdown the connection. + self.receiveGoAway() + } + /// We've received a GOAWAY frame from the remote peer. Either the remote peer wants to close the /// connection or they're responding to us shutting down the connection. mutating func receiveGoAway() -> Operations { diff --git a/Tests/GRPCTests/ConnectionManagerTests.swift b/Tests/GRPCTests/ConnectionManagerTests.swift index 77486b0c9..db3127f54 100644 --- a/Tests/GRPCTests/ConnectionManagerTests.swift +++ b/Tests/GRPCTests/ConnectionManagerTests.swift @@ -165,6 +165,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -217,6 +218,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -273,6 +275,7 @@ extension ConnectionManagerTests { inboundStreamInitializer: nil ), idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -322,6 +325,7 @@ extension ConnectionManagerTests { inboundStreamInitializer: nil ), idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -350,6 +354,7 @@ extension ConnectionManagerTests { inboundStreamInitializer: nil ), idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -391,6 +396,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -464,6 +470,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -536,6 +543,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -654,6 +662,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -730,6 +739,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -807,6 +817,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: firstH2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -855,6 +866,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: secondH2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -905,6 +917,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -1063,6 +1076,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -1120,6 +1134,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -1201,6 +1216,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: multiplexer, idleTimeout: .minutes(5), + maxAge: nil, keepalive: ClientConnectionKeepalive(), logger: self.logger ) @@ -1314,6 +1330,7 @@ extension ConnectionManagerTests { connectionManager: connectionManager, multiplexer: multiplexer, idleTimeout: .minutes(60), + maxAge: nil, keepalive: .init(), logger: self.clientLogger ) diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index d25489630..2a079183c 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -1299,6 +1299,7 @@ extension ChannelController: ConnectionManagerChannelProvider { connectionManager: connectionManager, multiplexer: multiplexer, idleTimeout: .minutes(5), + maxAge: nil, keepalive: ClientConnectionKeepalive(), logger: logger ) diff --git a/Tests/GRPCTests/MaxAgeTests.swift b/Tests/GRPCTests/MaxAgeTests.swift new file mode 100644 index 000000000..48c15d641 --- /dev/null +++ b/Tests/GRPCTests/MaxAgeTests.swift @@ -0,0 +1,241 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import EchoImplementation +import EchoModel +import GRPC +import NIOConcurrencyHelpers +import NIOCore +import NIOPosix +import XCTest + +final class MaxAgeTests: XCTestCase { + private func withEchoClient( + group: any EventLoopGroup, + configure: (inout GRPCChannelPool.Configuration) -> Void, + test: (Echo_EchoNIOClient) throws -> Void + ) throws { + let eventLoop = MultiThreadedEventLoopGroup.singleton.next() + + let server = try Server.insecure(group: group) + .withServiceProviders([EchoProvider()]) + .bind(host: "127.0.0.1", port: 0) + .wait() + + defer { + try? server.close().wait() + } + + let port = server.channel.localAddress!.port! + + let pool = try GRPCChannelPool.with( + target: .host("127.0.0.1", port: port), + transportSecurity: .plaintext, + eventLoopGroup: eventLoop, + configure + ) + + defer { + try? pool.close().wait() + } + + try test(Echo_EchoNIOClient(channel: pool)) + } + + func testMaxAgeIsRespected() throws { + // Verifies that the max-age config is respected by using the connection pool delegate to + // start new RPCs when each connection closes (which close by aging out). It'll also record + // various events that happen as part of the lifecycle of each connection. + + // The pool creates one sub-pool per event loop. Use a single loop to simplify connection + // counting. + let eventLoop = MultiThreadedEventLoopGroup.singleton.next() + let done = eventLoop.makePromise(of: [RPCOnConnectionClosedDelegate.Event].self) + let iterations = 2 + let delegate = RPCOnConnectionClosedDelegate(iterations: iterations, done: done) + // This needs to be relatively short so the test doesn't take too long but not so short that + // the connection is closed before it's actually used. + let maxConnectionAge: TimeAmount = .milliseconds(50) + + try withEchoClient(group: eventLoop) { config in + config.maxConnectionAge = maxConnectionAge + config.delegate = delegate + } test: { echo in + // This creates a retain cycle (delegate → echo → channel → delegate), break it when the + // test is done. + delegate.setEcho(echo) + defer { delegate.setEcho(nil) } + + let startTime = NIODeadline.now() + + // Do an RPC to kick things off. + let rpc = try echo.get(.with { $0.text = "hello" }).response.wait() + XCTAssertEqual(rpc.text, "Swift echo get: hello") + + // Wait for the delegate to finish driving the RPCs. + let events = try done.futureResult.wait() + let endTime = NIODeadline.now() + + // Add an iteration as one is done by the test (as opposed to the delegate). Each iteration + // has three events: connected, quiescing, closed. + XCTAssertEqual(events.count, (iterations + 1) * 3) + + // Check each triplet is as expected: connected, quiescing, then closed. + for startIndex in stride(from: events.startIndex, to: events.endIndex, by: 3) { + switch (events[startIndex], events[startIndex + 1], events[startIndex + 2]) { + case (.connectSucceeded(let id1), .connectionQuiescing(let id2), .connectionClosed(let id3)): + XCTAssertEqual(id1, id2) + XCTAssertEqual(id2, id3) + default: + XCTFail("Invalid event triplet: \(events[startIndex ... startIndex + 2])") + } + } + + // Check the duration was in the right ballpark. + let duration = (endTime - startTime) + let minDuration = iterations * maxConnectionAge + XCTAssertGreaterThanOrEqual(duration, minDuration) + // Allow a few seconds of slack for max duration as some CI systems can be slow. + let maxDuration = iterations * maxConnectionAge + .seconds(5) + XCTAssertLessThanOrEqual(duration, maxDuration) + } + } + + private final class RPCOnConnectionClosedDelegate: GRPCConnectionPoolDelegate { + enum Event: Sendable, Hashable { + case connectSucceeded(GRPCConnectionID) + case connectionQuiescing(GRPCConnectionID) + case connectionClosed(GRPCConnectionID) + } + + private struct State { + var events: [Event] = [] + var echo: Echo_EchoNIOClient? = nil + var iterations: Int + } + + private let state: NIOLockedValueBox + private let done: EventLoopPromise<[Event]> + + func setEcho(_ echo: Echo_EchoNIOClient?) { + self.state.withLockedValue { state in + state.echo = echo + } + } + + init(iterations: Int, done: EventLoopPromise<[Event]>) { + self.state = NIOLockedValueBox(State(iterations: iterations)) + self.done = done + } + + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { + self.state.withLockedValue { state in + state.events.append(.connectSucceeded(id)) + } + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.state.withLockedValue { state in + state.events.append(.connectionQuiescing(id)) + } + } + + func connectionClosed(id: GRPCConnectionID, error: (any Error)?) { + enum Action { + case doNextRPC(Echo_EchoNIOClient) + case done([Event]) + } + + let action: Action = self.state.withLockedValue { state in + state.events.append(.connectionClosed(id)) + + if state.iterations > 0 { + state.iterations -= 1 + return .doNextRPC(state.echo!) + } else { + return .done(state.events) + } + } + + switch action { + case .doNextRPC(let echo): + // Start an RPC to trigger a connect. The result doesn't matter: + _ = echo.get(.with { $0.text = "hello" }) + case .done(let events): + self.done.succeed(events) + } + } + + func connectionAdded(id: GRPCConnectionID) {} + func connectionRemoved(id: GRPCConnectionID) {} + func startedConnecting(id: GRPCConnectionID) {} + func connectFailed(id: GRPCConnectionID, error: any Error) {} + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { + } + } + + func testRPCContinuesAfterQuiescing() throws { + // Check that an in-flight RPC can continue to run after the connection is quiescing as a result + // or aging out. + + // The pool creates one sub-pool per event loop. Use a single loop to simplify connection + // counting. + let eventLoop = MultiThreadedEventLoopGroup.singleton.next() + let isQuiescing = eventLoop.makePromise(of: Void.self) + + try withEchoClient(group: eventLoop) { config in + config.maxConnectionAge = .milliseconds(50) + config.delegate = SucceedOnQuiescing(promise: isQuiescing) + } test: { echo in + // Send an initial message. + let rpc = echo.collect() + try rpc.sendMessage(.with { $0.text = "1" }).wait() + + // Wait for the connection to quiesce. + try isQuiescing.futureResult.wait() + + // Send a few more messages then end. + try rpc.sendMessage(.with { $0.text = "2" }).wait() + try rpc.sendMessage(.with { $0.text = "3" }).wait() + try rpc.sendEnd().wait() + + let response = try rpc.response.wait() + XCTAssertEqual(response.text, "Swift echo collect: 1 2 3") + } + } + + final class SucceedOnQuiescing: GRPCConnectionPoolDelegate { + private let quiescingPromise: EventLoopPromise + + init(promise: EventLoopPromise) { + self.quiescingPromise = promise + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.quiescingPromise.succeed() + } + + func connectionAdded(id: GRPCConnectionID) {} + func connectionRemoved(id: GRPCConnectionID) {} + func startedConnecting(id: GRPCConnectionID) {} + func connectFailed(id: GRPCConnectionID, error: any Error) {} + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) {} + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { + } + func connectionClosed(id: GRPCConnectionID, error: (any Error)?) {} + } + +} From 172b6111f75da78203a0789db44608be833386c9 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 8 May 2025 10:38:13 +0100 Subject: [PATCH 2/3] fix rename --- Sources/GRPC/ConnectionPool/PooledChannel.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index 9c4546dc1..255ea3142 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -116,7 +116,7 @@ internal final class PooledChannel: GRPCChannel { connectionTarget: configuration.target, connectionKeepalive: configuration.keepalive, connectionIdleTimeout: configuration.idleTimeout, - connectionMaxAge: configuration.maxAge, + connectionMaxAge: configuration.maxConnectionAge, tlsMode: tlsMode, tlsConfiguration: configuration.transportSecurity.tlsConfiguration, httpTargetWindowSize: configuration.http2.targetWindowSize, From 21dfd77935ba270dacbb7ded888d58ec70dfa93f Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 8 May 2025 13:21:42 +0100 Subject: [PATCH 3/3] Update Tests/GRPCTests/MaxAgeTests.swift Co-authored-by: Gus Cairo --- Tests/GRPCTests/MaxAgeTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/GRPCTests/MaxAgeTests.swift b/Tests/GRPCTests/MaxAgeTests.swift index 48c15d641..8b4cc1120 100644 --- a/Tests/GRPCTests/MaxAgeTests.swift +++ b/Tests/GRPCTests/MaxAgeTests.swift @@ -189,7 +189,7 @@ final class MaxAgeTests: XCTestCase { func testRPCContinuesAfterQuiescing() throws { // Check that an in-flight RPC can continue to run after the connection is quiescing as a result - // or aging out. + // of aging out. // The pool creates one sub-pool per event loop. Use a single loop to simplify connection // counting.