Skip to content

Commit 42e8d4b

Browse files
committed
Allow repeated commands to same connection in pool
Motivation: Some Redis commands are very connection specific that have impacts on future access that makes it difficult in the current checkout-use-return cycle that `RedisConnectionPool` uses. Developers need a way to borrow a specific connection, chain several commands together, and then return the connection to the pool. Modifications: - Add: `leaseConnection` method to `RedisConnectionPool` which provides a connection from the pool and returns it after a provided closure's ELF resolves - Add: `allowSubscriptions` property to `RedisConnection` for controlling the ability to make PubSub subscriptions - Add: `RedisClientError.pubsubNotAllowed` case for when `RedisConnection.allowSubscriptions` is set to `false` and a subscription was still attempted Result: Developers should now have an "escape hatch" with `RedisConnectionPool` to do limited exclusive chains of operations on a specific connection.
1 parent 56f0ab0 commit 42e8d4b

File tree

9 files changed

+188
-8
lines changed

9 files changed

+188
-8
lines changed

Sources/RediStack/Connection Pool/ConnectionPool.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,13 +366,17 @@ extension ConnectionPool {
366366
// that yet, so double-check. Leave the dead ones there: we'll get them later.
367367
while let connection = self.availableConnections.popLast() {
368368
if connection.isConnected {
369+
logger.debug("found available connection", metadata: [
370+
RedisLogging.MetadataKeys.connectionID: "\(connection.id)"
371+
])
369372
self.leaseConnection(connection, to: waiter)
370373
return waiter.futureResult
371374
}
372375
}
373376

374377
// Ok, we didn't have any available connections. We're going to have to wait. Set our timeout.
375378
waiter.scheduleDeadline(loop: self.loop, deadline: deadline) {
379+
logger.trace("connection not found in time")
376380
// The waiter timed out. We're going to fail the promise and remove the waiter.
377381
waiter.fail(RedisConnectionPoolError.timedOutWaitingForConnection)
378382

@@ -385,6 +389,7 @@ extension ConnectionPool {
385389
// below the max, or the pool is leaky, we can create a new connection. Otherwise, we just have
386390
// to wait for a connection to come back.
387391
if self.activeConnectionCount < self.maximumConnectionCount || self.leaky {
392+
logger.trace("creating new connection")
388393
self._createConnection(backoff: self.initialBackoffDelay, startIn: .nanoseconds(0), logger: logger)
389394
}
390395

Sources/RediStack/RedisClient.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ public struct RedisClientError: LocalizedError, Equatable, Hashable {
249249
public static let connectionClosed = RedisClientError(.connectionClosed)
250250
/// A race condition was triggered between unsubscribing from the last target while subscribing to a new target.
251251
public static let subscriptionModeRaceCondition = RedisClientError(.subscriptionModeRaceCondition)
252+
/// A connection that is not authorized for PubSub subscriptions attempted to create a subscription.
253+
public static let pubsubNotAllowed = RedisClientError(.pubsubNotAllowed)
252254

253255
/// Conversion from `RESPValue` to the specified type failed.
254256
///
@@ -271,6 +273,7 @@ public struct RedisClientError: LocalizedError, Equatable, Hashable {
271273
case let .failedRESPConversion(type): message = "failed to convert RESP to \(type)"
272274
case let .assertionFailure(text): message = text
273275
case .subscriptionModeRaceCondition: message = "received request to subscribe after subscription mode has ended"
276+
case .pubsubNotAllowed: message = "connection attempted to create a PubSub subscription"
274277
}
275278
return "(RediStack) \(message)"
276279
}
@@ -281,6 +284,7 @@ public struct RedisClientError: LocalizedError, Equatable, Hashable {
281284
case .failedRESPConversion: return "Ensure that the data type being requested is actually what's being returned. If you see this error and are not sure why, capture the original RESPValue string sent from Redis to add to your bug report."
282285
case .assertionFailure: return "This error should in theory never happen. If you trigger this error, capture the original RESPValue string sent from Redis along with the command and arguments that you sent to Redis to add to your bug report."
283286
case .subscriptionModeRaceCondition: return "This is a race condition where the PubSub handler was removed after a subscription was being added, but before it was committed. This can be solved by just retrying the subscription."
287+
case .pubsubNotAllowed: return "When connections are managed by a pool, they are not allowed to create PubSub subscriptions on their own. Use the appropriate PubSub commands on the connection pool itself. If the connection is not managed by a pool, this is a bug and should be reported."
284288
}
285289
}
286290

@@ -303,5 +307,6 @@ public struct RedisClientError: LocalizedError, Equatable, Hashable {
303307
case failedRESPConversion(to: Any.Type)
304308
case assertionFailure(message: String)
305309
case subscriptionModeRaceCondition
310+
case pubsubNotAllowed
306311
}
307312
}

Sources/RediStack/RedisConnection.swift

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,29 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
125125
autoflush.store(newValue)
126126
}
127127
}
128-
128+
/// Controls the permission of the connection to be able to have PubSub subscriptions or not.
129+
///
130+
/// When set to `true`, this connection is allowed to create subscriptions.
131+
/// When set to `false`, this connection is not allowed to create subscriptions. Any potentially existing subscriptions will be removed.
132+
public var allowSubscriptions: Bool {
133+
get { self.allowPubSub.load() }
134+
set(newValue) {
135+
self.allowPubSub.store(newValue)
136+
// TODO: Re-enable after [p]unsubscribe from all is fixed
137+
// guard self.isConnected else { return }
138+
// _ = EventLoopFuture<Void>.whenAllComplete([
139+
// self.unsubscribe(),
140+
// self.punsubscribe()
141+
// ], on: self.eventLoop)
142+
}
143+
}
144+
129145
internal let channel: Channel
130146
private let systemContext: Context
131147
private var logger: Logger { self.systemContext }
132148

133149
private let autoflush: NIOAtomic<Bool> = .makeAtomic(value: true)
150+
private let allowPubSub: NIOAtomic<Bool> = .makeAtomic(value: true)
134151
private let _stateLock = Lock()
135152
private var _state = ConnectionState.open
136153
private var state: ConnectionState {
@@ -418,6 +435,11 @@ extension RedisConnection {
418435
// if we're closed, just error out
419436
guard self.state.isConnected else { return self.eventLoop.makeFailedFuture(RedisClientError.connectionClosed) }
420437

438+
// if we're not allowed to to subscribe, then fail
439+
guard self.allowSubscriptions else {
440+
return self.eventLoop.makeFailedFuture(RedisClientError.pubsubNotAllowed)
441+
}
442+
421443
logger.trace("adding subscription", metadata: [
422444
RedisLogging.MetadataKeys.pubsubTarget: "\(target.debugDescription)"
423445
])

Sources/RediStack/RedisConnectionPool.swift

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,47 @@ extension RedisConnectionPool {
148148
}
149149
}
150150

151+
/// Provides limited exclusive access to a connection to be used in a user-defined specialized closure of operations.
152+
/// - Warning: Attempting to create PubSub subscriptions with connections leased in the closure will result in a failed `NIO.EventLoopFuture`.
153+
///
154+
/// `RedisConnectionPool` manages PubSub state and requires exclusive control over creating PubSub subscriptions.
155+
/// - Important: This connection **MUST NOT** be stored outside of the closure. It is only available exclusively within the closure.
156+
///
157+
/// All operations should be done inside the closure as chained `NIO.EventLoopFuture` callbacks.
158+
///
159+
/// For example:
160+
/// ```swift
161+
/// let countFuture = pool.leaseConnection {
162+
/// $0.logging(to: myLogger)
163+
/// .authorize(with: userPassword)
164+
/// .flatMap { connection.select(database: userDatabase) }
165+
/// .flatMap { connection.increment(counterKey) }
166+
/// }
167+
/// ```
168+
/// - Warning: Some commands change the state of the connection that are not tracked client-side,
169+
/// and will not be automatically reset when the connection is returned to the pool.
170+
///
171+
/// When the connection is reused from the pool, it will retain this state and may affect future commands executed with it.
172+
///
173+
/// For example, if `select(database:)` is used, all future commands made with this connection will be against the selected database.
174+
///
175+
/// To protect against future issues, make sure the final commands executed are to reset the connection to it's previous known state.
176+
/// - Parameter operation: A closure that receives exclusive access to the provided `RedisConnection` for the lifetime of the closure for specialized Redis command chains.
177+
/// - Returns: A `NIO.EventLoopFuture` that resolves the value of the `NIO.EventLoopFuture` in the provided closure operation.
178+
@inlinable
179+
public func leaseConnection<T>(_ operation: @escaping (RedisConnection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
180+
return self.forwardOperationToConnection(
181+
{
182+
(connection, returnConnection, context) in
183+
184+
return operation(connection)
185+
.always { _ in returnConnection(connection, context) }
186+
},
187+
preferredConnection: nil,
188+
context: nil
189+
)
190+
}
191+
151192
/// Updates the list of valid connection addresses.
152193
///
153194
/// - Note: This does not invalidate existing connections: as long as those connections continue to stay up, they will be kept by
@@ -179,13 +220,16 @@ extension RedisConnectionPool {
179220
return targetLoop.makeFailedFuture(RedisConnectionPoolError.noAvailableConnectionTargets)
180221
}
181222

182-
return RedisConnection.connect(
223+
let connectFuture = RedisConnection.connect(
183224
to: nextTarget,
184225
on: targetLoop,
185226
password: self.connectionPassword,
186227
logger: self.connectionSystemContext,
187228
tcpClient: self.connectionTCPClient
188229
)
230+
// disallow subscriptions on all connections by default so that we can enforce our management of PubSub state
231+
connectFuture.whenSuccess { $0.allowSubscriptions = false }
232+
return connectFuture
189233
}
190234

191235
private func prepareLoggerForUse(_ logger: Logger?) -> Logger {
@@ -323,7 +367,10 @@ extension RedisConnectionPool: RedisClientWithUserContext {
323367
return self.forwardOperationToConnection(
324368
{ (connection, returnConnection, context) in
325369

326-
if self.pubsubConnection == nil { self.pubsubConnection = connection }
370+
if self.pubsubConnection == nil {
371+
connection.allowSubscriptions = true // allow pubsub commands which are to come
372+
self.pubsubConnection = connection
373+
}
327374

328375
let onUnsubscribe: RedisSubscriptionChangeHandler = { channelName, subCount in
329376
defer { unsubscribeHandler?(channelName, subCount) }
@@ -332,7 +379,8 @@ extension RedisConnectionPool: RedisClientWithUserContext {
332379
subCount == 0,
333380
let connection = self.pubsubConnection
334381
else { return }
335-
382+
383+
connection.allowSubscriptions = false // reset PubSub permissions
336384
returnConnection(connection, context)
337385
self.pubsubConnection = nil // break ref cycle
338386
}
@@ -367,7 +415,8 @@ extension RedisConnectionPool: RedisClientWithUserContext {
367415
)
368416
}
369417

370-
private func forwardOperationToConnection<T>(
418+
@usableFromInline
419+
internal func forwardOperationToConnection<T>(
371420
_ operation: @escaping (RedisConnection, @escaping (RedisConnection, Context) -> Void, Context) -> EventLoopFuture<T>,
372421
preferredConnection: RedisConnection?,
373422
context: Context?

Sources/RediStack/RedisContext.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ import struct Logging.Logger
2020
// so in order to be "future thinking" we create this typealias and interally refer to this passing of configuration
2121
// as context
2222

23+
@usableFromInline
2324
internal typealias Context = Logging.Logger

Sources/RediStackTestUtils/RedisConnectionPoolIntegrationTestCase.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,15 @@ open class RedisConnectionPoolIntegrationTestCase: XCTestCase {
7575
}
7676

7777
public func makeNewPool(
78-
connectionRetryTimeout: TimeAmount? = .seconds(5)
78+
connectionRetryTimeout: TimeAmount? = .seconds(5),
79+
minimumConnectionCount: Int = 0
7980
) throws -> RedisConnectionPool {
8081
let address = try SocketAddress.makeAddressResolvingHost(self.redisHostname, port: self.redisPort)
8182
let pool = RedisConnectionPool(
8283
serverConnectionAddresses: [address],
8384
loop: self.eventLoopGroup.next(),
8485
maximumConnectionCount: .maximumActiveConnections(4),
85-
minimumConnectionCount: 0,
86+
minimumConnectionCount: minimumConnectionCount,
8687
connectionPassword: self.redisPassword,
8788
connectionRetryTimeout: connectionRetryTimeout
8889
)

Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTest
159159

160160
let channel = RedisChannelName(#function)
161161
let pattern = "\(channel.rawValue.dropLast(channel.rawValue.count / 2))*"
162-
print(channel, pattern)
163162

164163
try subscriber
165164
.subscribe(to: channel) { (_, _) in channelMessageExpectation.fulfill() }

Tests/RediStackIntegrationTests/RedisConnectionPoolTests.swift

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,61 @@ final class RedisConnectionPoolTests: RediStackConnectionPoolIntegrationTestCase
4343
XCTAssertNoThrow(try pool.get(#function).wait())
4444
}
4545
}
46+
47+
// MARK: Leasing a connection
48+
49+
extension RedisConnectionPoolTests {
50+
func test_borrowedConnectionStillReturnsOnError() throws {
51+
enum TestError: Error { case expected }
52+
53+
let maxConnectionCount = 4
54+
let pool = try self.makeNewPool(minimumConnectionCount: maxConnectionCount)
55+
defer { pool.close() }
56+
_ = try pool.ping().wait()
57+
58+
let promise = pool.eventLoop.makePromise(of: Void.self)
59+
60+
XCTAssertEqual(pool.availableConnectionCount, maxConnectionCount)
61+
defer { XCTAssertEqual(pool.availableConnectionCount, maxConnectionCount) }
62+
63+
let future = pool.leaseConnection { _ in promise.futureResult }
64+
65+
promise.fail(TestError.expected)
66+
XCTAssertThrowsError(try future.wait()) {
67+
XCTAssertTrue($0 is TestError)
68+
}
69+
}
70+
71+
func test_borrowedConnectionClosureHasExclusiveAccess() throws {
72+
let maxConnectionCount = 4
73+
let pool = try self.makeNewPool(minimumConnectionCount: maxConnectionCount)
74+
defer { pool.close() }
75+
// populate the connection pool
76+
_ = try pool.ping().wait()
77+
78+
// assert that we have the max number of connections available,
79+
XCTAssertEqual(pool.availableConnectionCount, maxConnectionCount)
80+
81+
// borrow a connection, asserting that we've taken the connection out of the pool while we do "something" with it
82+
// and then assert afterwards that it's back in the pool
83+
84+
let promises: [EventLoopPromise<Void>] = [pool.eventLoop.makePromise(), pool.eventLoop.makePromise()]
85+
let futures = promises.indices
86+
.map { index in
87+
return pool
88+
.leaseConnection { connection -> EventLoopFuture<Void> in
89+
XCTAssertTrue(pool.availableConnectionCount < maxConnectionCount)
90+
91+
return promises[index].futureResult
92+
}
93+
}
94+
95+
promises.forEach { $0.succeed(()) }
96+
_ = try EventLoopFuture<Void>
97+
.whenAllSucceed(futures, on: pool.eventLoop)
98+
.always { _ in
99+
XCTAssertEqual(pool.availableConnectionCount, maxConnectionCount)
100+
}
101+
.wait()
102+
}
103+
}

Tests/RediStackIntegrationTests/RedisConnectionTests.swift

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,43 @@ final class RedisConnectionTests: RediStackIntegrationTestCase {
4040
}
4141
}
4242
}
43+
44+
// MARK: PubSub permissions
45+
46+
extension RedisConnectionTests {
47+
func test_subscriptionNotAllowedFails() throws {
48+
self.connection.allowSubscriptions = false
49+
let subscription = self.connection.subscribe(to: #function) { (_, _) in }
50+
51+
XCTAssertThrowsError(try subscription.wait()) {
52+
guard let error = $0 as? RedisClientError else {
53+
XCTFail("unexpected error type: \(type(of: $0))")
54+
return
55+
}
56+
XCTAssertEqual(error, .pubsubNotAllowed)
57+
}
58+
}
59+
60+
// TODO - fix [p]unsubscribe from all and re-enable this unit test
61+
// func test_subscriptionPermissionsChanged_endsSubscriptions() throws {
62+
// let connection = try self.makeNewConnection()
63+
//
64+
// let channelSubClosedExpectation = self.expectation(description: "channel subscription was closed")
65+
// let patternSubClosedExpectation = self.expectation(description: "pattern subscription was closed")
66+
//
67+
// _ = connection.subscribe(
68+
// to: #function,
69+
// messageReceiver: { (_, _) in },
70+
// onUnsubscribe: { (_, _) in channelSubClosedExpectation.fulfill() }
71+
// )
72+
// _ = connection.psubscribe(
73+
// to: #function,
74+
// messageReceiver: { (_, _) in },
75+
// onUnsubscribe: { (_, _) in patternSubClosedExpectation.fulfill() }
76+
// )
77+
//
78+
// connection.allowSubscriptions = false
79+
//
80+
// self.waitForExpectations(timeout: 2)
81+
// }
82+
}

0 commit comments

Comments
 (0)