@@ -15,6 +15,7 @@ import struct Foundation.UUID
15
15
import NIO
16
16
import NIOConcurrencyHelpers
17
17
import Logging
18
+ import ServiceDiscovery
18
19
19
20
/// A `RedisConnectionPool` is an implementation of `RedisClient` backed by a pool of connections to Redis,
20
21
/// rather than a single one.
@@ -50,6 +51,15 @@ public class RedisConnectionPool {
50
51
// This array buffers any request for a connection that cannot be succeeded right away in the case where we have no target.
51
52
// We never allow this to get larger than a specific bound, to resist DoS attacks. Past that bound we will fast-fail.
52
53
private var requestsForConnections : [ EventLoopPromise < RedisConnection > ] = [ ]
54
+ // This is var because if we're using service discovery, we don't start doing that until activate is called.
55
+ private var cancellationToken : CancellationToken ? {
56
+ willSet {
57
+ guard let token = self . cancellationToken, !token. isCancelled, token !== newValue else {
58
+ return
59
+ }
60
+ token. cancel ( )
61
+ }
62
+ }
53
63
54
64
/// The maximum number of connection requests we'll buffer in `requestsForConnections` before we start fast-failing. These
55
65
/// are buffered only when there are no available addresses to connect to, so in practice it's highly unlikely this will be
@@ -88,6 +98,33 @@ public class RedisConnectionPool {
88
98
}
89
99
}
90
100
101
+ // MARK: Alternative initializers
102
+ extension RedisConnectionPool {
103
+ /// Constructs a `RedisConnectionPool` that updates its addresses based on information from
104
+ /// service discovery.
105
+ ///
106
+ /// This constructor behaves similarly to the regular constructor. However, it also activates the
107
+ /// connection pool before returning it to the user. This is necessary because the act of subscribing
108
+ /// to service discovery forms a reference cycle between the service discovery instance and the
109
+ /// `RedisConnectionPool`. Pools constructed via this constructor _must_ always have `close` called
110
+ /// on them.
111
+ ///
112
+ /// Pools created via this constructor will be auto-closed when the service discovery instance is completed for
113
+ /// any reason, including on error. Users should still always call `close` in their own code during teardown.
114
+ public static func activatedServiceDiscoveryPool< Discovery: ServiceDiscovery > (
115
+ service: Discovery . Service ,
116
+ discovery: Discovery ,
117
+ configuration: Configuration ,
118
+ boundEventLoop: EventLoop ,
119
+ logger: Logger ? = nil
120
+ ) -> RedisConnectionPool where Discovery. Instance == SocketAddress {
121
+ let pool = RedisConnectionPool ( configuration: configuration, boundEventLoop: boundEventLoop)
122
+ pool. beginSubscribingToServiceDiscovery ( service: service, discovery: discovery, logger: logger)
123
+ pool. activate ( logger: logger)
124
+ return pool
125
+ }
126
+ }
127
+
91
128
// MARK: General helpers.
92
129
extension RedisConnectionPool {
93
130
/// Starts the connection pool.
@@ -122,6 +159,9 @@ extension RedisConnectionPool {
122
159
for request in self . requestsForConnections {
123
160
request. fail ( RedisConnectionPoolError . poolClosed)
124
161
}
162
+
163
+ // This cancels service discovery.
164
+ self . cancellationToken = nil
125
165
}
126
166
}
127
167
@@ -247,6 +287,36 @@ extension RedisConnectionPool {
247
287
logger [ metadataKey: RedisLogging . MetadataKeys. connectionPoolID] = " \( self . id) "
248
288
return logger
249
289
}
290
+
291
+ /// A private helper function used for the service discovery constructor.
292
+ private func beginSubscribingToServiceDiscovery< Discovery: ServiceDiscovery > (
293
+ service: Discovery . Service ,
294
+ discovery: Discovery ,
295
+ logger: Logger ?
296
+ ) where Discovery. Instance == SocketAddress {
297
+ self . loop. execute {
298
+ let logger = self . prepareLoggerForUse ( logger)
299
+
300
+ self . cancellationToken = discovery. subscribe (
301
+ to: service,
302
+ onNext: { result in
303
+ // This closure may execute on any thread.
304
+ self . loop. execute {
305
+ switch result {
306
+ case . success( let targets) :
307
+ self . updateConnectionAddresses ( targets, logger: logger)
308
+ case . failure( let error) :
309
+ logger. error ( " Service discovery error " , metadata: [ RedisLogging . MetadataKeys. error: " \( error) " ] )
310
+ }
311
+ }
312
+ } ,
313
+ onComplete: { ( _: CompletionReason ) in
314
+ // We don't really care about the reason, we just want to brick this client.
315
+ self . close ( logger: logger)
316
+ }
317
+ )
318
+ }
319
+ }
250
320
}
251
321
252
322
// MARK: RedisClient conformance
0 commit comments