1
1
import NIO
2
2
import NIOConcurrencyHelpers
3
+ import NIOSSL
3
4
import RediStack
4
5
5
6
/// A client for interfacing with a Redis instance.
6
- public struct RedisClient : Service {
7
+ public struct RedisClient : Service , RediStack . RedisClient {
7
8
public struct Identifier : ServiceIdentifier {
8
9
private let hashable : AnyHashable
9
10
public init ( hashable: AnyHashable ) { self . hashable = hashable }
@@ -20,15 +21,77 @@ public struct RedisClient: Service {
20
21
try provider. shutdown ( )
21
22
}
22
23
24
+ // MARK: RediStack.RedisClient
25
+
26
+ public var eventLoop : EventLoop {
27
+ Loop . current
28
+ }
29
+
30
+ public func logging( to logger: Logger ) -> RediStack . RedisClient {
31
+ provider. logging ( to: logger)
32
+ }
33
+
34
+ public func send( command: String , with arguments: [ RESPValue ] ) -> EventLoopFuture < RESPValue > {
35
+ wrapError {
36
+ try provider. getClient ( )
37
+ . send ( command: command, with: arguments) . hop ( to: Loop . current)
38
+ }
39
+ }
40
+
41
+ public func subscribe(
42
+ to channels: [ RedisChannelName ] ,
43
+ messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver ,
44
+ onSubscribe subscribeHandler: RedisSubscriptionChangeHandler ? ,
45
+ onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler ?
46
+ ) -> EventLoopFuture < Void > {
47
+ wrapError {
48
+ try provider. getClient ( )
49
+ . subscribe (
50
+ to: channels,
51
+ messageReceiver: receiver,
52
+ onSubscribe: subscribeHandler,
53
+ onUnsubscribe: unsubscribeHandler
54
+ )
55
+ }
56
+ }
57
+
58
+ public func psubscribe(
59
+ to patterns: [ String ] ,
60
+ messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver ,
61
+ onSubscribe subscribeHandler: RedisSubscriptionChangeHandler ? ,
62
+ onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler ?
63
+ ) -> EventLoopFuture < Void > {
64
+ wrapError {
65
+ try provider. getClient ( )
66
+ . psubscribe (
67
+ to: patterns,
68
+ messageReceiver: receiver,
69
+ onSubscribe: subscribeHandler,
70
+ onUnsubscribe: unsubscribeHandler
71
+ )
72
+ }
73
+ }
74
+
75
+ public func unsubscribe( from channels: [ RedisChannelName ] ) -> EventLoopFuture < Void > {
76
+ wrapError { try provider. getClient ( ) . unsubscribe ( from: channels) }
77
+ }
78
+
79
+ public func punsubscribe( from patterns: [ String ] ) -> EventLoopFuture < Void > {
80
+ wrapError { try provider. getClient ( ) . punsubscribe ( from: patterns) }
81
+ }
82
+
83
+ // MARK: Creating
84
+
23
85
/// A single redis connection
24
86
public static func connection(
25
87
_ host: String ,
26
88
port: Int = 6379 ,
27
89
password: String ? = nil ,
28
90
database: Int ? = nil ,
29
- poolSize: RedisConnectionPoolSize = . maximumActiveConnections( 1 )
91
+ poolSize: RedisConnectionPoolSize = . maximumActiveConnections( 1 ) ,
92
+ tlsConfiguration: TLSConfiguration ? = nil
30
93
) -> RedisClient {
31
- return . cluster( . ip( host: host, port: port) , password: password, database: database, poolSize: poolSize)
94
+ return . cluster( . ip( host: host, port: port) , password: password, database: database, poolSize: poolSize, tlsConfiguration : tlsConfiguration )
32
95
}
33
96
34
97
/// Convenience initializer for creating a redis client with the
@@ -48,29 +111,21 @@ public struct RedisClient: Service {
48
111
_ sockets: Socket ... ,
49
112
password: String ? = nil ,
50
113
database: Int ? = nil ,
51
- poolSize: RedisConnectionPoolSize = . maximumActiveConnections( 1 )
114
+ poolSize: RedisConnectionPoolSize = . maximumActiveConnections( 1 ) ,
115
+ tlsConfiguration: TLSConfiguration ? = nil
52
116
) -> RedisClient {
53
117
return . configuration(
54
118
RedisConnectionPool . Configuration (
55
- initialServerConnectionAddresses: sockets. map {
56
- do {
57
- switch $0 {
58
- case let . ip( host, port) :
59
- return try . makeAddressResolvingHost( host, port: port)
60
- case let . unix( path) :
61
- return try . init( unixDomainSocketPath: path)
62
- }
63
- } catch {
64
- fatalError ( " Error generating socket address from `Socket` \( self ) ! " )
65
- }
66
- } ,
119
+ initialServerConnectionAddresses: [ ] ,
67
120
maximumConnectionCount: poolSize,
68
121
connectionFactoryConfiguration: RedisConnectionPool . ConnectionFactoryConfiguration (
69
122
connectionInitialDatabase: database,
70
123
connectionPassword: password,
71
- connectionDefaultLogger: Log . logger
124
+ connectionDefaultLogger: Log . logger,
125
+ tlsConfiguration: tlsConfiguration
72
126
)
73
- )
127
+ ) ,
128
+ addresses: sockets
74
129
)
75
130
}
76
131
@@ -83,44 +138,36 @@ public struct RedisClient: Service {
83
138
public static func configuration( _ config: RedisConnectionPool . Configuration ) -> RedisClient {
84
139
return RedisClient ( provider: ConnectionPool ( config: config) )
85
140
}
86
- }
87
-
88
- /// Under the hood provider for `Redis`. Used so either connection pools
89
- /// or connections can be injected into `Redis` for accessing redis.
90
- public protocol RedisProvider {
91
- /// Get a redis client for running commands.
92
- func getClient( ) -> RediStack . RedisClient
93
141
94
- /// Shut down.
95
- func shutdown( ) throws
96
-
97
- /// Runs a transaction on the redis client using a given closure.
98
- ///
99
- /// - Parameter transaction: An asynchronous transaction to run on
100
- /// the connection.
101
- /// - Returns: The resulting value of the transaction.
102
- func transaction< T> ( _ transaction: @escaping ( RedisProvider ) async throws -> T ) async throws -> T
142
+ fileprivate static func configuration( _ config: RedisConnectionPool . Configuration , addresses: [ Socket ] ) -> RedisClient {
143
+ return RedisClient ( provider: ConnectionPool ( config: config, lazyAddresses: addresses) )
144
+ }
103
145
}
104
146
105
147
/// A connection pool is a redis provider with a pool per `EventLoop`.
106
- private final class ConnectionPool : RedisProvider {
148
+ private final class ConnectionPool : RedisProvider , RediStack . RedisClient {
107
149
/// Map of `EventLoop` identifiers to respective connection pools.
108
150
private var poolStorage : [ ObjectIdentifier : RedisConnectionPool ] = [ : ]
109
151
private var poolLock = Lock ( )
152
+ private var lazyAddresses : [ Socket ] ?
153
+ private var logger : Logger ?
110
154
111
155
/// The configuration to create pools with.
112
156
private var config : RedisConnectionPool . Configuration
113
157
114
- init ( config: RedisConnectionPool . Configuration ) {
158
+ init ( config: RedisConnectionPool . Configuration , lazyAddresses : [ Socket ] ? = nil ) {
115
159
self . config = config
160
+ self . lazyAddresses = lazyAddresses
116
161
}
162
+
163
+ // MARK: - RedisProvider
117
164
118
- func getClient( ) -> RediStack . RedisClient {
119
- getPool ( )
165
+ func getClient( ) throws -> RediStack . RedisClient {
166
+ try getPool ( )
120
167
}
121
168
122
169
func transaction< T> ( _ transaction: @escaping ( RedisProvider ) async throws -> T ) async throws -> T {
123
- let pool = getPool ( )
170
+ let pool = try getPool ( )
124
171
return try await pool. leaseConnection { conn in
125
172
pool. eventLoop. asyncSubmit { try await transaction ( conn) }
126
173
} . get ( )
@@ -140,17 +187,88 @@ private final class ConnectionPool: RedisProvider {
140
187
///
141
188
/// - Returns: A `RedisConnectionPool` associated with the current
142
189
/// `EventLoop` for sending commands to.
143
- private func getPool( ) -> RedisConnectionPool {
190
+ private func getPool( ) throws -> RedisConnectionPool {
144
191
let loop = Loop . current
145
192
let key = ObjectIdentifier ( loop)
146
- return poolLock. withLock {
193
+ return try poolLock. withLock {
147
194
if let pool = self . poolStorage [ key] {
148
195
return pool
149
196
} else {
150
- let newPool = RedisConnectionPool ( configuration: self . config, boundEventLoop: loop)
197
+ var config = self . config
198
+ if let lazyAddresses = lazyAddresses {
199
+ let initialAddresses : [ SocketAddress ] = try lazyAddresses. map {
200
+ switch $0 {
201
+ case let . ip( host, port) :
202
+ return try . makeAddressResolvingHost( host, port: port)
203
+ case let . unix( path) :
204
+ return try . init( unixDomainSocketPath: path)
205
+ }
206
+ }
207
+
208
+ config = RedisConnectionPool . Configuration (
209
+ initialServerConnectionAddresses: initialAddresses,
210
+ maximumConnectionCount: config. maximumConnectionCount,
211
+ connectionFactoryConfiguration: config. factoryConfiguration,
212
+ minimumConnectionCount: config. minimumConnectionCount,
213
+ connectionBackoffFactor: config. connectionRetryConfiguration. backoff. factor,
214
+ initialConnectionBackoffDelay: config. connectionRetryConfiguration. backoff. initialDelay,
215
+ connectionRetryTimeout: config. connectionRetryConfiguration. timeout,
216
+ poolDefaultLogger: config. poolDefaultLogger)
217
+ }
218
+
219
+ let newPool = RedisConnectionPool ( configuration: config, boundEventLoop: loop)
151
220
self . poolStorage [ key] = newPool
152
- return newPool
221
+ if let logger = logger {
222
+ return newPool. logging ( to: logger) as? RedisConnectionPool ?? newPool
223
+ } else {
224
+ return newPool
225
+ }
153
226
}
154
227
}
155
228
}
229
+
230
+ // MARK: RediStack.RedisClient
231
+
232
+ var eventLoop : EventLoop { Loop . current }
233
+
234
+ func logging( to logger: Logger ) -> RediStack . RedisClient {
235
+ self . logger = logger
236
+ return self
237
+ }
238
+
239
+ func punsubscribe( from patterns: [ String ] ) -> EventLoopFuture < Void > {
240
+ wrapError { try getClient ( ) . punsubscribe ( from: patterns) }
241
+ }
242
+
243
+ func unsubscribe( from channels: [ RedisChannelName ] ) -> EventLoopFuture < Void > {
244
+ wrapError { try getClient ( ) . unsubscribe ( from: channels) }
245
+ }
246
+
247
+ func send( command: String , with arguments: [ RESPValue ] ) -> EventLoopFuture < RESPValue > {
248
+ wrapError { try getClient ( ) . send ( command: command, with: arguments) }
249
+ }
250
+
251
+ private func wrapError< T> ( _ closure: ( ) throws -> EventLoopFuture < T > ) -> EventLoopFuture < T > {
252
+ do { return try closure ( ) }
253
+ catch { return Loop . current. makeFailedFuture ( error) }
254
+ }
255
+ }
256
+
257
+ extension RedisConnection : RedisProvider {
258
+ public func getClient( ) -> RediStack . RedisClient {
259
+ self
260
+ }
261
+
262
+ public func shutdown( ) throws {
263
+ try close ( ) . wait ( )
264
+ }
265
+
266
+ public func transaction< T> ( _ transaction: @escaping ( RedisProvider ) async throws -> T ) async throws -> T {
267
+ try await transaction ( self )
268
+ }
269
+ }
270
+
271
+ private func wrapError< T> ( _ closure: ( ) throws -> EventLoopFuture < T > ) -> EventLoopFuture < T > {
272
+ do { return try closure ( ) }
273
+ catch { return Loop . current. makeFailedFuture ( error) }
156
274
}
0 commit comments