Skip to content

Commit 338a6f4

Browse files
LukasaMordil
authored andcommitted
Delay connection attempts without addresses.
In some circumstances users may have connection pools configured without any SocketAddresses ready to go. This is particularly likely in service discovery configurations. Right now, the effect of attempting to use such a pool is two fold. First, we'll emit a bunch of error level logs telling users we have no addresses. Second, we'll fall into the exponential backoff phase of connection establishment. The first property is annoying, but the second one is actively harmful. If your construction is timed incorrectly, we'll have the awkward problem of burning a bunch of CPU trying to create connections we know we cannot, and then a lengthy delay after the addresses are actually configured before we start trying to use them. That's the worst of all worlds. This patch adds logic to detect the attempt to create connections when we don't have any configured addresses and delays them. This path should improve performance and ergonomics when in this mode.
1 parent 5b05e26 commit 338a6f4

File tree

4 files changed

+90
-5
lines changed

4 files changed

+90
-5
lines changed

Sources/RediStack/ConnectionPool/ConnectionPool.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ extension ConnectionPool {
385385
}
386386
self.connectionWaiters.append(waiter)
387387

388-
// What are we going to wait for? Well, now we check. If the number of active connections is
388+
// Ok, we have connection targets. If the number of active connections is
389389
// below the max, or the pool is leaky, we can create a new connection. Otherwise, we just have
390390
// to wait for a connection to come back.
391391
if self.activeConnectionCount < self.maximumConnectionCount || self.leaky {

Sources/RediStack/RedisConnectionPool.swift

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ public class RedisConnectionPool {
4747
private var serverConnectionAddresses: ConnectionAddresses
4848
// This needs to be a var because its value changes as the pool enters/leaves pubsub mode to reuse the same connection.
4949
private var pubsubConnection: RedisConnection?
50+
// This array buffers any request for a connection that cannot be succeeded right away in the case where we have no target.
51+
// We never allow this to get larger than a specific bound, to resist DoS attacks. Past that bound we will fast-fail.
52+
private var requestsForConnections: [EventLoopPromise<RedisConnection>] = []
53+
54+
/// The maximum number of connection requests we'll buffer in `requestsForConnections` before we start fast-failing. These
55+
/// are buffered only when there are no available addresses to connect to, so in practice it's highly unlikely this will be
56+
/// hit, but either way, 100 concurrent connection requests ought to be plenty in this case.
57+
private static let maximumBufferedConnectionRequests = 100
5058

5159
public init(configuration: Configuration, boundEventLoop: EventLoop) {
5260
var config = configuration
@@ -109,6 +117,11 @@ extension RedisConnectionPool {
109117

110118
// This breaks the cycle between us and the pool.
111119
self.pool = nil
120+
121+
// Drop all pending connection attempts. No need to empty this manually, it'll get dropped regardless.
122+
for request in self.requestsForConnections {
123+
request.fail(RedisConnectionPoolError.poolClosed)
124+
}
112125
}
113126
}
114127

@@ -171,6 +184,14 @@ extension RedisConnectionPool {
171184

172185
self.loop.execute {
173186
self.serverConnectionAddresses.update(newAddresses)
187+
188+
// Shiny, we can unbuffer any pending connections and pass them on as they now have somewhere to go.
189+
let unbufferedRequests = self.requestsForConnections
190+
self.requestsForConnections = []
191+
192+
for request in unbufferedRequests {
193+
request.completeWith(self.connectionFactory(self.loop))
194+
}
174195
}
175196
}
176197

@@ -182,8 +203,17 @@ extension RedisConnectionPool {
182203
let factoryConfig = self.configuration.factoryConfiguration
183204

184205
guard let nextTarget = self.serverConnectionAddresses.nextTarget() else {
185-
// No valid connection target, we'll fail.
186-
return targetLoop.makeFailedFuture(RedisConnectionPoolError.noAvailableConnectionTargets)
206+
// No valid connection target, we'll keep track of the request and attempt to satisfy it later.
207+
// First, confirm we have space to keep track of this. If not, fast-fail.
208+
guard self.requestsForConnections.count < RedisConnectionPool.maximumBufferedConnectionRequests else {
209+
return targetLoop.makeFailedFuture(RedisConnectionPoolError.noAvailableConnectionTargets)
210+
}
211+
212+
// Ok, we can buffer, let's do that.
213+
self.prepareLoggerForUse(nil).notice("waiting for target addresses")
214+
let promise = targetLoop.makePromise(of: RedisConnection.self)
215+
self.requestsForConnections.append(promise)
216+
return promise.futureResult
187217
}
188218

189219
let connectionConfig: RedisConnection.Configuration

Sources/RediStackTestUtils/RedisConnectionPoolIntegrationTestCase.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Logging
1516
import NIO
1617
import RediStack
1718
import XCTest
@@ -75,16 +76,19 @@ open class RedisConnectionPoolIntegrationTestCase: XCTestCase {
7576
}
7677

7778
public func makeNewPool(
79+
initialAddresses: [SocketAddress]? = nil,
80+
initialConnectionBackoffDelay: TimeAmount = .milliseconds(100),
7881
connectionRetryTimeout: TimeAmount? = .seconds(5),
7982
minimumConnectionCount: Int = 0
8083
) throws -> RedisConnectionPool {
81-
let address = try SocketAddress.makeAddressResolvingHost(self.redisHostname, port: self.redisPort)
84+
let addresses = try initialAddresses ?? [SocketAddress.makeAddressResolvingHost(self.redisHostname, port: self.redisPort)]
8285
let pool = RedisConnectionPool(
8386
configuration: .init(
84-
initialServerConnectionAddresses: [address],
87+
initialServerConnectionAddresses: addresses,
8588
maximumConnectionCount: .maximumActiveConnections(4),
8689
connectionFactoryConfiguration: .init(connectionPassword: self.redisPassword),
8790
minimumConnectionCount: minimumConnectionCount,
91+
initialConnectionBackoffDelay: initialConnectionBackoffDelay,
8892
connectionRetryTimeout: connectionRetryTimeout
8993
),
9094
boundEventLoop: self.eventLoopGroup.next()

Tests/RediStackIntegrationTests/RedisConnectionPoolTests.swift

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,59 @@ final class RedisConnectionPoolTests: RediStackConnectionPoolIntegrationTestCase
4343

4444
func test_nilConnectionRetryTimeoutStillWorks() throws {
4545
let pool = try self.makeNewPool(connectionRetryTimeout: nil)
46+
defer { pool.close() }
4647
XCTAssertNoThrow(try pool.get(#function).wait())
4748
}
49+
50+
func test_noConnectionAttemptsUntilAddressesArePresent() throws {
51+
// Note the config here: we have no initial addresses, the connecton backoff delay is 10 seconds, and the retry timeout is only 5 seconds.
52+
// The effect of this config is that if we fail a connection attempt, we'll fail it forever.
53+
let pool = try self.makeNewPool(initialAddresses: [], initialConnectionBackoffDelay: .seconds(10), connectionRetryTimeout: .seconds(5), minimumConnectionCount: 0)
54+
defer { pool.close() }
55+
56+
// As above we're gonna try to insert a bunch of elements into a set. This time,
57+
// the pool has no addresses yet. We expect that when we add an address later everything will work nicely.
58+
// We do fewer here.
59+
let operations = (0..<10).map { number in
60+
pool.send(.sadd([number], to: #function))
61+
}
62+
63+
// Now that we've kicked those off, let's hand over a new address.
64+
try pool.updateConnectionAddresses([SocketAddress.makeAddressResolvingHost(self.redisHostname, port: self.redisPort)])
65+
66+
// We should get the results.
67+
let results = try EventLoopFuture<Int>.whenAllSucceed(operations, on: self.eventLoopGroup.next()).wait()
68+
XCTAssertEqual(results, Array(repeating: 1, count: 10))
69+
}
70+
71+
func testDelayedConnectionsFailOnClose() throws {
72+
// Note the config here: we have no initial addresses, the connecton backoff delay is 10 seconds, and the retry timeout is only 5 seconds.
73+
// The effect of this config is that if we fail a connection attempt, we'll fail it forever.
74+
let pool = try self.makeNewPool(initialAddresses: [], initialConnectionBackoffDelay: .seconds(10), connectionRetryTimeout: .seconds(5), minimumConnectionCount: 0)
75+
defer { pool.close() }
76+
77+
// As above we're gonna try to insert a bunch of elements into a set. This time,
78+
// the pool has no addresses yet. We expect that when we add an address later everything will work nicely.
79+
// We do fewer here.
80+
let operations = (0..<10).map { number in
81+
pool.send(.sadd([number], to: #function))
82+
}
83+
84+
// Now that we've kicked those off, let's close.
85+
pool.close()
86+
87+
let results = try EventLoopFuture<Int>.whenAllComplete(operations, on: self.eventLoopGroup.next()).wait()
88+
for result in results {
89+
switch result {
90+
case .success:
91+
XCTFail("Request succeeded")
92+
case .failure(let error) where error as? RedisConnectionPoolError == .poolClosed:
93+
() // Pass
94+
case .failure(let error):
95+
XCTFail("Unexpected failure: \(error)")
96+
}
97+
}
98+
}
4899
}
49100

50101
// MARK: Leasing a connection

0 commit comments

Comments
 (0)