Skip to content

Commit ec5b396

Browse files
authored
Add a minimum connections configuration to the ConnectionPool (#1822)
Motivation: Sometimes, depending on the characteristics of the traffic, we may need to execute requests in spikes. It's possible that when we get a large spike of requests, a lot of connections are opened, which are subsequently closed by the idle timer. However, when another spike comes next, because we don't have any available open connections, latency will suffer as a result of us having to open new connections. Modifications: Add a ConnectionPool configuration to allow a minimum number of connections to remain open, that is, make a number of connections not go idle. Result: A number of connections will remain open even when there are no open streams on them.
1 parent 7028cfb commit ec5b396

File tree

10 files changed

+180
-11
lines changed

10 files changed

+180
-11
lines changed

Sources/GRPC/ClientConnection.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public final class ClientConnection: Sendable {
131131
self.connectionManager = ConnectionManager(
132132
configuration: configuration,
133133
connectivityDelegate: monitor,
134+
idleBehavior: .closeWhenIdleTimeout,
134135
logger: configuration.backgroundActivityLogger
135136
)
136137
}

Sources/GRPC/ConnectionManager.swift

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ import NIOHTTP2
2525
// event loop is being used.
2626
@usableFromInline
2727
internal final class ConnectionManager: @unchecked Sendable {
28+
29+
/// Whether the connection managed by this manager should be allowed to go idle and be closed, or
30+
/// if it should remain open indefinitely even when there are no active streams.
31+
internal enum IdleBehavior {
32+
case closeWhenIdleTimeout
33+
case neverGoIdle
34+
}
35+
2836
internal enum Reconnect {
2937
case none
3038
case after(TimeInterval)
@@ -324,6 +332,9 @@ internal final class ConnectionManager: @unchecked Sendable {
324332
/// attempts should be made at all.
325333
private let connectionBackoff: ConnectionBackoff?
326334

335+
/// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires).
336+
internal let idleBehavior: IdleBehavior
337+
327338
/// A logger.
328339
internal var logger: Logger
329340

@@ -356,12 +367,14 @@ internal final class ConnectionManager: @unchecked Sendable {
356367
configuration: ClientConnection.Configuration,
357368
channelProvider: ConnectionManagerChannelProvider? = nil,
358369
connectivityDelegate: ConnectionManagerConnectivityDelegate?,
370+
idleBehavior: IdleBehavior,
359371
logger: Logger
360372
) {
361373
self.init(
362374
eventLoop: configuration.eventLoopGroup.next(),
363375
channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
364376
callStartBehavior: configuration.callStartBehavior.wrapped,
377+
idleBehavior: idleBehavior,
365378
connectionBackoff: configuration.connectionBackoff,
366379
connectivityDelegate: connectivityDelegate,
367380
http2Delegate: nil,
@@ -373,6 +386,7 @@ internal final class ConnectionManager: @unchecked Sendable {
373386
eventLoop: EventLoop,
374387
channelProvider: ConnectionManagerChannelProvider,
375388
callStartBehavior: CallStartBehavior.Behavior,
389+
idleBehavior: IdleBehavior,
376390
connectionBackoff: ConnectionBackoff?,
377391
connectivityDelegate: ConnectionManagerConnectivityDelegate?,
378392
http2Delegate: ConnectionManagerHTTP2Delegate?,
@@ -392,6 +406,7 @@ internal final class ConnectionManager: @unchecked Sendable {
392406
self.connectionBackoff = connectionBackoff
393407
self.connectivityDelegate = connectivityDelegate
394408
self.http2Delegate = http2Delegate
409+
self.idleBehavior = idleBehavior
395410

396411
self.connectionID = connectionID
397412
self.channelNumber = channelNumber
@@ -799,7 +814,7 @@ internal final class ConnectionManager: @unchecked Sendable {
799814
// Yes, after some time.
800815
case let .after(delay):
801816
let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
802-
// Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
817+
// Fail the candidate mux promise. Keep the 'readyChannelMuxPromise' as we'll try again.
803818
connecting.candidateMuxPromise.fail(error)
804819

805820
let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {

Sources/GRPC/ConnectionPool/ConnectionPool.swift

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ internal final class ConnectionPool {
8585
@usableFromInline
8686
internal let maxWaiters: Int
8787

88+
/// The number of connections in the pool that should always be kept open (i.e. they won't go idle).
89+
/// In other words, it's the number of connections for which we should ignore idle timers.
90+
@usableFromInline
91+
internal let minConnections: Int
92+
8893
/// Configuration for backoff between subsequence connection attempts.
8994
@usableFromInline
9095
internal let connectionBackoff: ConnectionBackoff
@@ -157,6 +162,7 @@ internal final class ConnectionPool {
157162
init(
158163
eventLoop: EventLoop,
159164
maxWaiters: Int,
165+
minConnections: Int,
160166
reservationLoadThreshold: Double,
161167
assumedMaxConcurrentStreams: Int,
162168
connectionBackoff: ConnectionBackoff,
@@ -176,6 +182,7 @@ internal final class ConnectionPool {
176182

177183
self._connections = [:]
178184
self.maxWaiters = maxWaiters
185+
self.minConnections = minConnections
179186
self.waiters = CircularBuffer(initialCapacity: 16)
180187

181188
self.eventLoop = eventLoop
@@ -201,17 +208,25 @@ internal final class ConnectionPool {
201208
]
202209
)
203210
self._connections.reserveCapacity(connections)
211+
var numberOfKeepOpenConnections = self.minConnections
204212
while self._connections.count < connections {
205-
self.addConnectionToPool()
213+
// If we have less than the minimum number of connections, don't let
214+
// the new connection close when idle.
215+
let idleBehavior =
216+
numberOfKeepOpenConnections > 0
217+
? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout
218+
numberOfKeepOpenConnections -= 1
219+
self.addConnectionToPool(idleBehavior: idleBehavior)
206220
}
207221
}
208222

209223
/// Make and add a new connection to the pool.
210-
private func addConnectionToPool() {
224+
private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) {
211225
let manager = ConnectionManager(
212226
eventLoop: self.eventLoop,
213227
channelProvider: self.channelProvider,
214228
callStartBehavior: .waitsForConnectivity,
229+
idleBehavior: idleBehavior,
215230
connectionBackoff: self.connectionBackoff,
216231
connectivityDelegate: self,
217232
http2Delegate: self,
@@ -220,6 +235,19 @@ internal final class ConnectionPool {
220235
let id = manager.id
221236
self._connections[id] = PerConnectionState(manager: manager)
222237
self.delegate?.connectionAdded(id: .init(id))
238+
239+
// If it's one of the connections that should be kept open, then connect
240+
// straight away.
241+
switch idleBehavior {
242+
case .neverGoIdle:
243+
self.eventLoop.execute {
244+
if manager.sync.isIdle {
245+
manager.sync.startConnecting()
246+
}
247+
}
248+
case .closeWhenIdleTimeout:
249+
()
250+
}
223251
}
224252

225253
// MARK: - Called from the pool manager
@@ -689,8 +717,9 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate {
689717
// Grab the number of reserved streams (before invalidating the index by adding a connection).
690718
let reservedStreams = self._connections.values[index].reservedStreams
691719

692-
// Replace the connection with a new idle one.
693-
self.addConnectionToPool()
720+
// Replace the connection with a new idle one. Keep the idle behavior, so that
721+
// if it's a connection that should be kept alive, we maintain it.
722+
self.addConnectionToPool(idleBehavior: manager.idleBehavior)
694723

695724
// Since we're removing this connection from the pool (and no new streams can be created on
696725
// the connection), the pool manager can ignore any streams reserved against this connection.
@@ -881,6 +910,22 @@ extension ConnectionPool {
881910
return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
882911
}
883912

913+
/// The number of active (i.e. connecting or ready) connections in the pool.
914+
internal var activeConnections: Int {
915+
self.pool.eventLoop.assertInEventLoop()
916+
return self.pool._connections.values.reduce(0) {
917+
$0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0)
918+
}
919+
}
920+
921+
/// The number of connections in the pool in transient failure state.
922+
internal var transientFailureConnections: Int {
923+
self.pool.eventLoop.assertInEventLoop()
924+
return self.pool._connections.values.reduce(0) {
925+
$0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0)
926+
}
927+
}
928+
884929
/// The number of streams currently available to reserve across all connections in the pool.
885930
internal var availableStreams: Int {
886931
self.pool.eventLoop.assertInEventLoop()

Sources/GRPC/ConnectionPool/GRPCChannelPool.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ extension GRPCChannelPool.Configuration {
275275
/// Defaults to 100.
276276
public var maxWaitersPerEventLoop: Int = 100
277277

278+
/// The minimum number of connections to keep open in this pool, per EventLoop.
279+
/// This number of connections per EventLoop will never go idle and be closed.
280+
public var minConnectionsPerEventLoop: Int = 0
281+
278282
/// The maximum amount of time a caller is willing to wait for a stream for before timing out.
279283
///
280284
/// Defaults to 30 seconds.

Sources/GRPC/ConnectionPool/PoolManager.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ internal final class PoolManager {
3333
@usableFromInline
3434
var maxWaiters: Int
3535

36+
/// The minimum number of connections to keep open per pool.
37+
/// This number of connections will never go idle and be closed.
38+
@usableFromInline
39+
var minConnections: Int
40+
3641
/// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
3742
/// (assuming there is a connection available to start).
3843
@usableFromInline
@@ -62,6 +67,7 @@ internal final class PoolManager {
6267
internal init(
6368
maxConnections: Int,
6469
maxWaiters: Int,
70+
minConnections: Int,
6571
loadThreshold: Double,
6672
assumedMaxConcurrentStreams: Int = 100,
6773
connectionBackoff: ConnectionBackoff,
@@ -70,6 +76,7 @@ internal final class PoolManager {
7076
) {
7177
self.maxConnections = maxConnections
7278
self.maxWaiters = maxWaiters
79+
self.minConnections = minConnections
7380
self.loadThreshold = loadThreshold
7481
self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
7582
self.connectionBackoff = connectionBackoff
@@ -225,6 +232,7 @@ internal final class PoolManager {
225232
return ConnectionPool(
226233
eventLoop: eventLoop,
227234
maxWaiters: configuration.maxWaiters,
235+
minConnections: configuration.minConnections,
228236
reservationLoadThreshold: configuration.loadThreshold,
229237
assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
230238
connectionBackoff: configuration.connectionBackoff,

Sources/GRPC/ConnectionPool/PooledChannel.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ internal final class PooledChannel: GRPCChannel {
9696
perPoolConfiguration: .init(
9797
maxConnections: configuration.connectionPool.connectionsPerEventLoop,
9898
maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
99+
minConnections: configuration.connectionPool.minConnectionsPerEventLoop,
99100
loadThreshold: configuration.connectionPool.reservationLoadThreshold,
100101
assumedMaxConcurrentStreams: 100,
101102
connectionBackoff: configuration.connectionBackoff,

Sources/GRPC/GRPCIdleHandler.swift

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
2424
typealias OutboundOut = HTTP2Frame
2525

2626
/// The amount of time to wait before closing the channel when there are no active streams.
27-
private let idleTimeout: TimeAmount
27+
/// If nil, then we shouldn't schedule idle tasks.
28+
private let idleTimeout: TimeAmount?
2829

2930
/// The ping handler.
3031
private var pingHandler: PingHandler
@@ -78,7 +79,12 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
7879
logger: Logger
7980
) {
8081
self.mode = .client(connectionManager, multiplexer)
81-
self.idleTimeout = idleTimeout
82+
switch connectionManager.idleBehavior {
83+
case .neverGoIdle:
84+
self.idleTimeout = nil
85+
case .closeWhenIdleTimeout:
86+
self.idleTimeout = idleTimeout
87+
}
8288
self.stateMachine = .init(role: .client, logger: logger)
8389
self.pingHandler = PingHandler(
8490
pingCode: 5,
@@ -135,7 +141,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
135141
}
136142

137143
// Handle idle timeout creation/cancellation.
138-
if let idleTask = operations.idleTask {
144+
if let idleTimeout = self.idleTimeout, let idleTask = operations.idleTask {
139145
switch idleTask {
140146
case let .cancel(task):
141147
self.stateMachine.logger.debug("idle timeout task cancelled")
@@ -145,9 +151,9 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
145151
if self.idleTimeout != .nanoseconds(.max), let context = self.context {
146152
self.stateMachine.logger.debug(
147153
"scheduling idle timeout task",
148-
metadata: [MetadataKey.delayMs: "\(self.idleTimeout.milliseconds)"]
154+
metadata: [MetadataKey.delayMs: "\(idleTimeout.milliseconds)"]
149155
)
150-
let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
156+
let task = context.eventLoop.scheduleTask(in: idleTimeout) {
151157
self.stateMachine.logger.debug("idle timeout task fired")
152158
self.idleTimeoutFired()
153159
}

Tests/GRPCTests/ConnectionManagerTests.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class ConnectionManagerTests: GRPCTestCase {
5959
configuration: configuration,
6060
channelProvider: channelProvider.map { HookedChannelProvider($0) },
6161
connectivityDelegate: self.monitor,
62+
idleBehavior: .closeWhenIdleTimeout,
6263
logger: self.logger
6364
)
6465
}
@@ -948,6 +949,7 @@ extension ConnectionManagerTests {
948949
return loop.makeFailedFuture(DoomedChannelError())
949950
},
950951
connectivityDelegate: nil,
952+
idleBehavior: .closeWhenIdleTimeout,
951953
logger: self.logger
952954
)
953955
let candidate = manager.getHTTP2Multiplexer()
@@ -1207,6 +1209,7 @@ extension ConnectionManagerTests {
12071209
return eventLoop.makeSucceededFuture(channel)
12081210
},
12091211
callStartBehavior: .waitsForConnectivity,
1212+
idleBehavior: .closeWhenIdleTimeout,
12101213
connectionBackoff: ConnectionBackoff(),
12111214
connectivityDelegate: nil,
12121215
http2Delegate: http2,
@@ -1383,6 +1386,7 @@ extension ConnectionManagerTests {
13831386
configuration: configuration,
13841387
channelProvider: Provider(),
13851388
connectivityDelegate: self.monitor,
1389+
idleBehavior: .closeWhenIdleTimeout,
13861390
logger: self.logger
13871391
)
13881392

0 commit comments

Comments
 (0)