Skip to content

Commit 2d95af5

Browse files
committed
Add RedisPipeline to eventually replace NIORedisPipeline that works more deeply with NIO
1 parent 1469282 commit 2d95af5

File tree

3 files changed

+76
-1
lines changed

3 files changed

+76
-1
lines changed

Sources/NIORedis/NIORedisPipeline.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import NIO
1313
/// // results[1].int == Optional(4)
1414
/// - Important: The larger the pipeline queue, the more memory both NIORedis and Redis will use.
1515
/// See https://redis.io/topics/pipelining#redis-pipelining
16+
@available(*, deprecated)
1617
public final class NIORedisPipeline {
1718
/// The client to execute the commands on.
1819
private let connection: NIORedisConnection

Sources/NIORedis/RedisConnection.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,13 @@ public final class RedisConnection {
5353
promise: promise
5454
)
5555

56-
#warning("TODO - Pipelining")
5756
_ = channel.writeAndFlush(context)
5857

5958
return promise.futureResult
6059
}
60+
61+
/// Creates a `RedisPipeline` for executing a batch of commands.
62+
public func makePipeline() -> RedisPipeline {
63+
return .init(channel: channel)
64+
}
6165
}

Sources/NIORedis/RedisPipeline.swift

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import Foundation
2+
3+
/// An object that provides a mechanism to "pipeline" multiple Redis commands in sequence,
4+
/// providing an aggregate response of all the Redis responses for each individual command.
5+
///
6+
/// let results = connection.makePipeline()
7+
/// .enqueue(command: "SET", arguments: ["my_key", 3])
8+
/// .enqueue(command: "INCR", arguments: ["my_key"])
9+
/// .execute()
10+
/// // results == Future<[RESPValue]>
11+
/// // results[0].string == Optional("OK")
12+
/// // results[1].int == Optional(4)
13+
///
14+
/// See https://redis.io/topics/pipelining#redis-pipelining
15+
/// - Important: The larger the pipeline queue, the more memory both NIORedis and Redis will use.
16+
public final class RedisPipeline {
17+
/// The number of commands in the pipeline.
18+
public var count: Int {
19+
return queuedCommandResults.count
20+
}
21+
22+
/// The channel being used to send commands with.
23+
private let channel: Channel
24+
25+
/// The queue of response handlers that have been queued.
26+
private var queuedCommandResults: [EventLoopFuture<RESPValue>]
27+
28+
/// Creates a new pipeline queue that will write to the channel provided.
29+
/// - Parameter channel: The `Channel` to write to.
30+
public init(channel: Channel) {
31+
self.channel = channel
32+
self.queuedCommandResults = []
33+
}
34+
35+
/// Queues the provided command and arguments to be executed when `execute()` is invoked.
36+
/// - Parameters:
37+
/// - command: The command to execute. See https://redis.io/commands
38+
/// - arguments: The arguments, if any, to send with the command.
39+
/// - Returns: A self-reference for chaining commands.
40+
@discardableResult
41+
public func enqueue(command: String, arguments: [RESPConvertible] = []) throws -> RedisPipeline {
42+
let args = try arguments.map { try $0.convertToRESP() }
43+
44+
let promise = channel.eventLoop.makePromise(of: RESPValue.self)
45+
let context = RedisCommandContext(
46+
command: .array([RESPValue(bulk: command)] + args),
47+
promise: promise
48+
)
49+
50+
queuedCommandResults.append(promise.futureResult)
51+
52+
_ = channel.write(context)
53+
54+
return self
55+
}
56+
57+
/// Flushes the queue, sending all of the commands to Redis.
58+
/// - Important: If any of the commands fail, the remaining commands will not execute and the `EventLoopFuture` will fail.
59+
/// - Returns: An `EventLoopFuture` that resolves the `RESPValue` responses, in the same order as the command queue.
60+
public func execute() -> EventLoopFuture<[RESPValue]> {
61+
channel.flush()
62+
63+
return EventLoopFuture<[RESPValue]>.reduce(
64+
into: [],
65+
queuedCommandResults,
66+
eventLoop: channel.eventLoop,
67+
{ (results, response) in results.append(response) }
68+
)
69+
}
70+
}

0 commit comments

Comments
 (0)