|
12 | 12 | //
|
13 | 13 | //===----------------------------------------------------------------------===//
|
14 | 14 |
|
15 |
| -import struct Foundation.UUID |
16 |
| -import struct Dispatch.DispatchTime |
17 |
| -import Logging |
18 |
| -import Metrics |
19 | 15 | import NIO
|
20 |
| -import NIOConcurrencyHelpers |
21 | 16 |
|
22 | 17 | /// An object capable of sending commands and receiving responses.
|
23 | 18 | ///
|
@@ -46,144 +41,3 @@ extension RedisClient {
|
46 | 41 | return self.send(command: command, with: [])
|
47 | 42 | }
|
48 | 43 | }
|
49 |
| - |
50 |
| -private let loggingKeyID = "RedisConnection" |
51 |
| - |
52 |
| -/// A `RedisClient` implementation that represents an individual connection |
53 |
| -/// to a Redis database instance. |
54 |
| -/// |
55 |
| -/// `RedisConnection` comes with logging by default. |
56 |
| -/// |
57 |
| -/// See `RedisClient` |
58 |
| -public final class RedisConnection: RedisClient { |
59 |
| - public static let defaultPort = 6379 |
60 |
| - |
61 |
| - private enum ConnectionState { |
62 |
| - case open |
63 |
| - case closed |
64 |
| - } |
65 |
| - |
66 |
| - /// See `RedisClient.eventLoop` |
67 |
| - public var eventLoop: EventLoop { return channel.eventLoop } |
68 |
| - /// Is the client still connected to Redis? |
69 |
| - public var isConnected: Bool { return state != .closed } |
70 |
| - /// Controls the timing behavior of sending commands over this connection. The default is `true`. |
71 |
| - /// |
72 |
| - /// When set to `false`, the host will "queue" commands and determine when to send all at once, |
73 |
| - /// while `true` will force each command to be sent as soon as they are "queued". |
74 |
| - /// - Note: Setting this to `true` will trigger all "queued" commands to be sent. |
75 |
| - public var sendCommandsImmediately: Bool { |
76 |
| - get { return autoflush.load() } |
77 |
| - set(newValue) { |
78 |
| - if newValue { channel.flush() } |
79 |
| - autoflush.store(newValue) |
80 |
| - } |
81 |
| - } |
82 |
| - |
83 |
| - let channel: Channel |
84 |
| - private var logger: Logger |
85 |
| - |
86 |
| - private let autoflush = Atomic<Bool>(value: true) |
87 |
| - private let _stateLock = Lock() |
88 |
| - private var _state: ConnectionState |
89 |
| - private var state: ConnectionState { |
90 |
| - get { return _stateLock.withLock { self._state } } |
91 |
| - set(newValue) { _stateLock.withLockVoid { self._state = newValue } } |
92 |
| - } |
93 |
| - |
94 |
| - deinit { |
95 |
| - if isConnected { |
96 |
| - assertionFailure("close() was not called before deinit!") |
97 |
| - logger.warning("RedisConnection did not properly shutdown before deinit!") |
98 |
| - } |
99 |
| - } |
100 |
| - |
101 |
| - /// Creates a new connection on the provided `Channel`. |
102 |
| - /// - Important: Call `close()` before deinitializing to properly cleanup resources. |
103 |
| - /// - Note: This connection will take ownership of the channel. |
104 |
| - /// - Parameters: |
105 |
| - /// - channel: The `Channel` to read and write from. |
106 |
| - /// - logger: The `Logger` instance to use for all logging purposes. |
107 |
| - public init(channel: Channel, logger: Logger = Logger(label: "RedisNIO.RedisConnection")) { |
108 |
| - self.channel = channel |
109 |
| - self.logger = logger |
110 |
| - |
111 |
| - self.logger[metadataKey: loggingKeyID] = "\(UUID())" |
112 |
| - self.logger.debug("Connection created.") |
113 |
| - self._state = .open |
114 |
| - RedisMetrics.activeConnectionCount += 1 |
115 |
| - RedisMetrics.totalConnectionCount.increment() |
116 |
| - } |
117 |
| - |
118 |
| - /// Sends a `QUIT` command, then closes the `Channel` this instance was initialized with. |
119 |
| - /// |
120 |
| - /// See [https://redis.io/commands/quit](https://redis.io/commands/quit) |
121 |
| - /// - Returns: An `EventLoopFuture` that resolves when the connection has been closed. |
122 |
| - @discardableResult |
123 |
| - public func close() -> EventLoopFuture<Void> { |
124 |
| - guard isConnected else { |
125 |
| - logger.notice("Connection received more than one close() request.") |
126 |
| - return channel.eventLoop.makeSucceededFuture(()) |
127 |
| - } |
128 |
| - |
129 |
| - let result = send(command: "QUIT") |
130 |
| - .flatMap { _ in |
131 |
| - let promise = self.channel.eventLoop.makePromise(of: Void.self) |
132 |
| - self.channel.close(promise: promise) |
133 |
| - return promise.futureResult |
134 |
| - } |
135 |
| - .map { |
136 |
| - self.logger.debug("Connection closed.") |
137 |
| - RedisMetrics.activeConnectionCount -= 1 |
138 |
| - } |
139 |
| - .recover { |
140 |
| - self.logger.error("Encountered error during close(): \($0)") |
141 |
| - self.state = .open |
142 |
| - } |
143 |
| - |
144 |
| - // setting it to closed now prevents multiple close() chains, but doesn't stop the QUIT command |
145 |
| - // if the connection wasn't closed, it's reset in the callback chain |
146 |
| - state = .closed |
147 |
| - |
148 |
| - return result |
149 |
| - } |
150 |
| - |
151 |
| - /// Sends commands to the Redis instance this connection is tied to. |
152 |
| - /// |
153 |
| - /// See `RedisClient.send(command:with:)` |
154 |
| - /// |
155 |
| - /// - Note: The timing of when commands are actually sent to Redis are controlled by |
156 |
| - /// the `sendCommandsImmediately` property. |
157 |
| - public func send( |
158 |
| - command: String, |
159 |
| - with arguments: [RESPValue] |
160 |
| - ) -> EventLoopFuture<RESPValue> { |
161 |
| - guard isConnected else { |
162 |
| - logger.error("\(RedisNIOError.connectionClosed.localizedDescription)") |
163 |
| - return channel.eventLoop.makeFailedFuture(RedisNIOError.connectionClosed) |
164 |
| - } |
165 |
| - |
166 |
| - var commandParts: [RESPValue] = [.init(bulk: command)] |
167 |
| - commandParts.append(contentsOf: arguments) |
168 |
| - |
169 |
| - let promise = channel.eventLoop.makePromise(of: RESPValue.self) |
170 |
| - let context = RedisCommand( |
171 |
| - command: .array(commandParts), |
172 |
| - promise: promise |
173 |
| - ) |
174 |
| - |
175 |
| - let startTime = DispatchTime.now().uptimeNanoseconds |
176 |
| - promise.futureResult.whenComplete { result in |
177 |
| - let duration = DispatchTime.now().uptimeNanoseconds - startTime |
178 |
| - RedisMetrics.commandRoundTripTime.recordNanoseconds(duration) |
179 |
| - guard case let .failure(error) = result else { return } |
180 |
| - self.logger.error("\(error.localizedDescription)") |
181 |
| - } |
182 |
| - logger.debug("Sending command \"\(command)\" with \(arguments)") |
183 |
| - |
184 |
| - guard sendCommandsImmediately else { |
185 |
| - return channel.write(context).flatMap { promise.futureResult } |
186 |
| - } |
187 |
| - return channel.writeAndFlush(context).flatMap { promise.futureResult } |
188 |
| - } |
189 |
| -} |
0 commit comments