|
| 1 | +import NIO |
| 2 | +import NIOConcurrencyHelpers |
| 3 | + |
| 4 | +/// A connection to a Redis database instance, with the ability to send and receive commands. |
| 5 | +/// |
| 6 | +/// let result = connection.send(command: "GET", arguments: ["my_key"] |
| 7 | +/// // result == EventLoopFuture<RESPValue> |
| 8 | +/// |
| 9 | +/// See https://redis.io/commands |
| 10 | +public final class RedisConnection { |
| 11 | + /// The `Channel` this connection is associated with. |
| 12 | + public let channel: Channel |
| 13 | + |
| 14 | + /// Has the connection been closed? |
| 15 | + public private(set) var isClosed = Atomic<Bool>(value: false) |
| 16 | + |
| 17 | + deinit { assert(isClosed.load(), "Redis connection was not properly shut down!") } |
| 18 | + |
| 19 | + /// Creates a new connection on the provided channel. |
| 20 | + /// - Note: This connection will take ownership of the `Channel` object. |
| 21 | + /// - Important: Call `close()` before deinitializing to properly cleanup resources. |
| 22 | + public init(channel: Channel) { |
| 23 | + self.channel = channel |
| 24 | + } |
| 25 | + |
| 26 | + /// Closes the connection to Redis. |
| 27 | + /// - Returns: An `EventLoopFuture` that resolves when the connection has been closed. |
| 28 | + @discardableResult |
| 29 | + public func close() -> EventLoopFuture<Void> { |
| 30 | + guard isClosed.exchange(with: true) else { return channel.eventLoop.makeSucceededFuture(result: ()) } |
| 31 | + |
| 32 | + let promise = channel.eventLoop.makePromise(of: Void.self) |
| 33 | + |
| 34 | + channel.close(promise: promise) |
| 35 | + |
| 36 | + return promise.futureResult |
| 37 | + } |
| 38 | + |
| 39 | + /// Sends the desired command with the specified arguments. |
| 40 | + /// - Parameters: |
| 41 | + /// - command: The command to execute. |
| 42 | + /// - arguments: The arguments to be sent with the command. |
| 43 | + /// - Returns: An `EventLoopFuture` that will resolve with the Redis command response. |
| 44 | + public func send(command: String, arguments: [RESPConvertible] = []) throws -> EventLoopFuture<RESPValue> { |
| 45 | + guard !isClosed.load() else { |
| 46 | + return channel.eventLoop.makeFailedFuture(error: RedisError.connectionClosed) |
| 47 | + } |
| 48 | + |
| 49 | + let promise = channel.eventLoop.makePromise(of: RESPValue.self) |
| 50 | + let args = try arguments.map { try $0.convertToRESP() } |
| 51 | + let context = RedisCommandContext( |
| 52 | + command: .array([RESPValue(bulk: command)] + args), |
| 53 | + promise: promise |
| 54 | + ) |
| 55 | + |
| 56 | + #warning("TODO - Pipelining") |
| 57 | + _ = channel.writeAndFlush(context) |
| 58 | + |
| 59 | + return promise.futureResult |
| 60 | + } |
| 61 | +} |
0 commit comments