|
| 1 | +//===----------------------------------------------------------------------===// |
| 2 | +// |
| 3 | +// This source file is part of the RediStack open source project |
| 4 | +// |
| 5 | +// Copyright (c) 2020 RediStack project authors |
| 6 | +// Licensed under Apache License v2.0 |
| 7 | +// |
| 8 | +// See LICENSE.txt for license information |
| 9 | +// See CONTRIBUTORS.txt for the list of RediStack project authors |
| 10 | +// |
| 11 | +// SPDX-License-Identifier: Apache-2.0 |
| 12 | +// |
| 13 | +//===----------------------------------------------------------------------===// |
| 14 | +import struct Foundation.UUID |
| 15 | +import NIO |
| 16 | +import NIOConcurrencyHelpers |
| 17 | +import Logging |
| 18 | + |
| 19 | +/// A `RedisConnectionPool` is an implementation of `RedisClient` backed by a pool of connections to Redis, |
| 20 | +/// rather than a single one. |
| 21 | +/// |
| 22 | +/// `RedisConnectionPool` uses a pool of connections on a single `EventLoop` to manage its activity. This |
| 23 | +/// pool may vary in size and strategy, including how many active connections it tries to manage at any one |
| 24 | +/// time and how it responds to demand for connections beyond its upper limit. |
| 25 | +/// |
| 26 | +/// Note that `RedisConnectionPool` is entirely thread-safe, even though all of its connections belong to a |
| 27 | +/// single `EventLoop`: if callers call the API from a different `EventLoop` (or from no `EventLoop` at all) |
| 28 | +/// `RedisConnectionPool` will ensure that the call is dispatched to the correct loop. |
| 29 | +public class RedisConnectionPool { |
| 30 | + // This needs to be var because we hand it a closure that references us strongly. This also |
| 31 | + // establishes a reference cycle which we need to break. |
| 32 | + // Aside from on init, all other operations on this var must occur on the event loop. |
| 33 | + private var pool: ConnectionPool? |
| 34 | + |
| 35 | + /// This needs to be var because it is updatable and mutable. As a result, aside from init, |
| 36 | + /// all use of this var must occur on the event loop. |
| 37 | + private var serverConnectionAddresses: ConnectionAddresses |
| 38 | + |
| 39 | + private let loop: EventLoop |
| 40 | + |
| 41 | + private var poolLogger: Logger |
| 42 | + |
| 43 | + /// This lock exists only to access the pool logger. We don't use the pool logger here at all, but |
| 44 | + /// we need to be able to give it to users in a way that is thread-safe, as users can also set it from |
| 45 | + /// any thread they want. |
| 46 | + private let poolLoggerLock: Lock |
| 47 | + |
| 48 | + private let connectionPassword: String? |
| 49 | + |
| 50 | + private let connectionLogger: Logger |
| 51 | + |
| 52 | + private let connectionTCPClient: ClientBootstrap? |
| 53 | + |
| 54 | + private let poolID: UUID |
| 55 | + |
| 56 | + /// Create a new `RedisConnectionPool`. |
| 57 | + /// |
| 58 | + /// - parameters: |
| 59 | + /// - serverConnectionAddresses: The set of Redis servers to which this pool is initially willing to connect. |
| 60 | + /// This set can be updated over time. |
| 61 | + /// - loop: The event loop to which this pooled client is tied. |
| 62 | + /// - maximumConnectionCount: The maximum number of connections to for this pool, either to be preserved or as a hard limit. |
| 63 | + /// - minimumConnectionCount: The minimum number of connections to preserve in the pool. If the pool is mostly idle |
| 64 | + /// and the Redis servers close these idle connections, the `RedisConnectionPool` will initiate new outbound |
| 65 | + /// connections proactively to avoid the number of available connections dropping below this number. Defaults to `1`. |
| 66 | + /// - connectionPassword: The password to use to connect to the Redis servers in this pool. |
| 67 | + /// - connectionLogger: The `Logger` to pass to each connection in the pool. |
| 68 | + /// - connectionTCPClient: The base `ClientBootstrap` to use to create pool connections, if a custom one is in use. |
| 69 | + /// - poolLogger: The `Logger` used by the connection pool itself. |
| 70 | + /// - connectionBackoffFactor: Used when connection attempts fail to control the exponential backoff. This is a multiplicative |
| 71 | + /// factor, each connection attempt will be delayed by this amount times the previous delay. |
| 72 | + /// - initialConnectionBackoffDelay: If a TCP connection attempt fails, this is the first backoff value on the reconnection attempt. |
| 73 | + /// Subsequent backoffs are computed by compounding this value by `connectionBackoffFactor`. |
| 74 | + public init( |
| 75 | + serverConnectionAddresses: [SocketAddress], |
| 76 | + loop: EventLoop, |
| 77 | + maximumConnectionCount: RedisConnectionPoolSize, |
| 78 | + minimumConnectionCount: Int = 1, |
| 79 | + connectionPassword: String? = nil, |
| 80 | + connectionLogger: Logger = .init(label: "RediStack.RedisConnection"), |
| 81 | + connectionTCPClient: ClientBootstrap? = nil, |
| 82 | + poolLogger: Logger = .init(label: "RediStack.RedisConnectionPool"), |
| 83 | + connectionBackoffFactor: Float32 = 2, |
| 84 | + initialConnectionBackoffDelay: TimeAmount = .milliseconds(100) |
| 85 | + ) { |
| 86 | + self.poolID = UUID() |
| 87 | + self.loop = loop |
| 88 | + self.serverConnectionAddresses = ConnectionAddresses(initialAddresses: serverConnectionAddresses) |
| 89 | + self.connectionPassword = connectionPassword |
| 90 | + |
| 91 | + var connectionLogger = connectionLogger |
| 92 | + connectionLogger[metadataKey: String(describing: RedisConnectionPool.self)] = "\(self.poolID)" |
| 93 | + self.connectionLogger = connectionLogger |
| 94 | + |
| 95 | + var poolLogger = poolLogger |
| 96 | + poolLogger[metadataKey: String(describing: RedisConnectionPool.self)] = "\(self.poolID)" |
| 97 | + self.poolLogger = poolLogger |
| 98 | + |
| 99 | + self.connectionTCPClient = connectionTCPClient |
| 100 | + self.poolLoggerLock = Lock() |
| 101 | + |
| 102 | + self.pool = ConnectionPool( |
| 103 | + maximumConnectionCount: maximumConnectionCount.size, |
| 104 | + minimumConnectionCount: minimumConnectionCount, |
| 105 | + leaky: maximumConnectionCount.leaky, |
| 106 | + loop: loop, |
| 107 | + logger: poolLogger, |
| 108 | + connectionBackoffFactor: connectionBackoffFactor, |
| 109 | + initialConnectionBackoffDelay: initialConnectionBackoffDelay, |
| 110 | + connectionFactory: self.connectionFactory(_:) |
| 111 | + ) |
| 112 | + } |
| 113 | +} |
| 114 | + |
| 115 | +// MARK: General helpers. |
| 116 | +extension RedisConnectionPool { |
| 117 | + public func activate() { |
| 118 | + self.loop.execute { |
| 119 | + self.pool?.activate() |
| 120 | + } |
| 121 | + } |
| 122 | + |
| 123 | + public func close() { |
| 124 | + self.loop.execute { |
| 125 | + self.pool?.close() |
| 126 | + |
| 127 | + // This breaks the cycle between us and the pool. |
| 128 | + self.pool = nil |
| 129 | + } |
| 130 | + } |
| 131 | + |
| 132 | + /// Updates the list of valid connection addresses. |
| 133 | + /// |
| 134 | + /// This does not invalidate existing connections: as long as those connections continue to stay up, they will be kept by |
| 135 | + /// this client. However, no new connections will be made to any endpoint that is not in `newTargets`. |
| 136 | + public func updateConnectionAddresses(_ newAddresses: [SocketAddress]) { |
| 137 | + self.poolLoggerLock.withLockVoid { |
| 138 | + self.poolLogger.info("Updated pool with new addresses", metadata: ["new-addresses": "\(newAddresses)"]) |
| 139 | + } |
| 140 | + |
| 141 | + self.loop.execute { |
| 142 | + self.serverConnectionAddresses.update(newAddresses) |
| 143 | + } |
| 144 | + } |
| 145 | + |
| 146 | + private func connectionFactory(_ targetLoop: EventLoop) -> EventLoopFuture<RedisConnection> { |
| 147 | + // Validate the loop invariants. |
| 148 | + self.loop.preconditionInEventLoop() |
| 149 | + targetLoop.preconditionInEventLoop() |
| 150 | + |
| 151 | + guard let nextTarget = self.serverConnectionAddresses.nextTarget() else { |
| 152 | + // No valid connection target, we'll fail. |
| 153 | + return targetLoop.makeFailedFuture(RedisConnectionPoolError.noAvailableConnectionTargets) |
| 154 | + } |
| 155 | + |
| 156 | + return RedisConnection.connect( |
| 157 | + to: nextTarget, |
| 158 | + on: targetLoop, |
| 159 | + password: self.connectionPassword, |
| 160 | + logger: self.connectionLogger, |
| 161 | + tcpClient: self.connectionTCPClient |
| 162 | + ) |
| 163 | + } |
| 164 | +} |
| 165 | + |
| 166 | +// MARK: RedisClient conformance |
| 167 | +extension RedisConnectionPool: RedisClient { |
| 168 | + public var eventLoop: EventLoop { |
| 169 | + return self.loop |
| 170 | + } |
| 171 | + |
| 172 | + public var logger: Logger { |
| 173 | + return self.poolLoggerLock.withLock { |
| 174 | + return self.poolLogger |
| 175 | + } |
| 176 | + } |
| 177 | + |
| 178 | + public func setLogging(to logger: Logger) { |
| 179 | + var logger = logger |
| 180 | + logger[metadataKey: String(describing: RedisConnectionPool.self)] = "\(self.poolID)" |
| 181 | + |
| 182 | + self.poolLoggerLock.withLock { |
| 183 | + self.poolLogger = logger |
| 184 | + |
| 185 | + // We must enqueue this before we drop the lock to prevent a race on setting this logger. |
| 186 | + self.loop.execute { |
| 187 | + self.pool?.setLogger(logger) |
| 188 | + } |
| 189 | + } |
| 190 | + } |
| 191 | + |
| 192 | + public func send(command: String, with arguments: [RESPValue]) -> EventLoopFuture<RESPValue> { |
| 193 | + // Establish event loop context then jump to the in-loop version. |
| 194 | + return self.loop.flatSubmit { |
| 195 | + return self._send(command: command, with: arguments) |
| 196 | + } |
| 197 | + } |
| 198 | + |
| 199 | + private func _send(command: String, with arguments: [RESPValue]) -> EventLoopFuture<RESPValue> { |
| 200 | + self.loop.preconditionInEventLoop() |
| 201 | + |
| 202 | + guard let pool = self.pool else { |
| 203 | + return self.loop.makeFailedFuture(RedisConnectionPoolError.poolClosed) |
| 204 | + } |
| 205 | + |
| 206 | + // For now we have to default the deadline. For maximum compatibility with the existing implementation, we use a fairly-long timeout: |
| 207 | + // one minute. |
| 208 | + return pool.leaseConnection(deadline: .now() + .seconds(60)).flatMap { connection in |
| 209 | + connection.sendCommandsImmediately = true |
| 210 | + return connection.send(command: command, with: arguments).always { _ in |
| 211 | + pool.returnConnection(connection) |
| 212 | + } |
| 213 | + } |
| 214 | + } |
| 215 | +} |
| 216 | + |
| 217 | +// MARK: Helper for round-robin connection establishment |
| 218 | +extension RedisConnectionPool { |
| 219 | + /// A helper structure for valid connection addresses. This structure implements round-robin connection establishment. |
| 220 | + private struct ConnectionAddresses { |
| 221 | + private var addresses: [SocketAddress] |
| 222 | + |
| 223 | + private var index: Array<SocketAddress>.Index |
| 224 | + |
| 225 | + init(initialAddresses: [SocketAddress]) { |
| 226 | + self.addresses = initialAddresses |
| 227 | + self.index = self.addresses.startIndex |
| 228 | + } |
| 229 | + |
| 230 | + mutating func nextTarget() -> SocketAddress? { |
| 231 | + // Early exit on 0, makes life easier. |
| 232 | + guard self.addresses.count > 0 else { |
| 233 | + self.index = self.addresses.startIndex |
| 234 | + return nil |
| 235 | + } |
| 236 | + |
| 237 | + // It's an invariant of this function that the index is always valid for subscripting the collection. |
| 238 | + let nextTarget = self.addresses[self.index] |
| 239 | + self.addresses.formIndex(after: &self.index) |
| 240 | + if self.index == self.addresses.endIndex { |
| 241 | + self.index = self.addresses.startIndex |
| 242 | + } |
| 243 | + return nextTarget |
| 244 | + } |
| 245 | + |
| 246 | + mutating func update(_ newAddresses: [SocketAddress]) { |
| 247 | + self.addresses = newAddresses |
| 248 | + self.index = self.addresses.startIndex |
| 249 | + } |
| 250 | + } |
| 251 | +} |
| 252 | + |
| 253 | + |
| 254 | +/// `RedisConnectionPoolSize` controls how the maximum number of connections in a pool are interpreted. |
| 255 | +public enum RedisConnectionPoolSize { |
| 256 | + /// The pool will allow no more than this number of connections to be "active" (that is, connecting, in-use, |
| 257 | + /// or pooled) at any one time. This will force possible future users of new connections to wait until a currently |
| 258 | + /// active connection becomes available by being returned to the pool, but provides a hard upper limit on concurrency. |
| 259 | + case maximumActiveConnections(Int) |
| 260 | + |
| 261 | + /// The pool will only store up to this number of connections that are not currently in-use. However, if the pool is |
| 262 | + /// asked for more connections at one time than this number, it will create new connections to serve those waiting for |
| 263 | + /// connections. These "extra" connections will not be preserved: while they will be used to satisfy those waiting for new |
| 264 | + /// connections if needed, they will not be preserved in the pool if load drops low enough. This does not provide a hard |
| 265 | + /// upper bound on concurrency, but does provide an upper bound on low-level load. |
| 266 | + case maximumPreservedConnections(Int) |
| 267 | + |
| 268 | + internal var size: Int { |
| 269 | + switch self { |
| 270 | + case .maximumActiveConnections(let size), .maximumPreservedConnections(let size): |
| 271 | + return size |
| 272 | + } |
| 273 | + } |
| 274 | + |
| 275 | + internal var leaky: Bool { |
| 276 | + switch self { |
| 277 | + case .maximumActiveConnections: |
| 278 | + return false |
| 279 | + case .maximumPreservedConnections: |
| 280 | + return true |
| 281 | + } |
| 282 | + } |
| 283 | +} |
0 commit comments