@@ -14,29 +14,12 @@ import Logging
14
14
///
15
15
/// See [https://redis.io/topics/pipelining#redis-pipelining](https://redis.io/topics/pipelining#redis-pipelining)
16
16
/// - Important: The larger the pipeline queue, the more memory both NIORedis and Redis will use.
17
- public protocol RedisPipeline {
17
+ ///
18
+ /// This implements `RedisClient` to use itself for enqueuing commands.
19
+ public final class RedisPipeline {
18
20
/// The number of commands in the pipeline.
19
- var count : Int { get }
20
-
21
- /// Queues an operation executed with the provided `RedisClient` that will be executed in sequence when
22
- /// `execute()` is invoked.
23
- ///
24
- /// let pipeline = connection.makePipeline()
25
- /// .enqueue { $0.set("my_key", "3") }
26
- /// .enqueue { $0.send(command: "INCR", with: ["my_key"]) }
27
- ///
28
- /// See `RedisClient`.
29
- /// - Parameter operation: The operation specified with `RedisClient` provided.
30
- /// - Returns: A self-reference for chaining commands.
31
- @discardableResult
32
- func enqueue< T> ( operation: ( RedisClient ) -> EventLoopFuture < T > ) -> RedisPipeline
33
-
34
- /// Flushes the queue, sending all of the commands to Redis.
35
- /// - Returns: An `EventLoopFuture` that resolves the `RESPValue` responses, in the same order as the command queue.
36
- func execute( ) -> EventLoopFuture < [ RESPValue ] >
37
- }
21
+ public var count : Int { return queuedCommandResults. count }
38
22
39
- public final class NIORedisPipeline {
40
23
private var logger : Logger
41
24
/// The channel being used to send commands with.
42
25
private let channel : Channel
@@ -46,7 +29,7 @@ public final class NIORedisPipeline {
46
29
47
30
/// Creates a new pipeline queue that will write to the channel provided.
48
31
/// - Parameter channel: The `Channel` to write to.
49
- public init ( channel: Channel , logger: Logger = Logger ( label: " NIORedis.Pipeline " ) ) {
32
+ public init ( channel: Channel , logger: Logger = Logger ( label: " NIORedis.RedisPipeline " ) ) {
50
33
self . channel = channel
51
34
self . logger = logger
52
35
self . queuedCommandResults = [ ]
@@ -56,24 +39,33 @@ public final class NIORedisPipeline {
56
39
}
57
40
}
58
41
59
- extension NIORedisPipeline : RedisPipeline {
60
- /// See `RedisPipeline.count`.
61
- public var count : Int {
62
- return queuedCommandResults. count
63
- }
42
+ // MARK: Enqueue & Execute
64
43
65
- /// See `RedisPipeline.enqueue(operation:)`.
44
+ extension RedisPipeline {
45
+ /// Queues an operation executed with the provided `RedisClient` that will be executed in
46
+ /// sequence when `execute()` is invoked.
47
+ ///
48
+ /// let pipeline = connection.makePipeline()
49
+ /// .enqueue { $0.set("my_key", to: 3) }
50
+ /// .enqueue { $0.increment("my_key") }
51
+ ///
52
+ /// See `RedisClient`.
53
+ /// - Parameter operation: The operation specified with the `RedisClient` provided.
54
+ /// - Returns: A self-reference for chaining commands.
66
55
@discardableResult
67
56
public func enqueue< T> ( operation: ( RedisClient ) -> EventLoopFuture < T > ) -> RedisPipeline {
68
57
// We are passing ourselves in as the executor instance,
69
- // and our implementation of `RedisCommandExecutor .send(command:with:) handles the actual queueing.
58
+ // and our implementation of `RedisClient .send(command:with:) handles the actual queueing.
70
59
_ = operation ( self )
71
60
logger. debug ( " Command queued. Pipeline size: \( count) " )
72
61
return self
73
62
}
74
63
75
- /// See `RedisPipeline.execute()`.
76
- /// - Important: If any of the commands fail, the remaining commands will not execute and the `EventLoopFuture` will fail.
64
+ /// Flushes the queue, sending all of the commands to Redis.
65
+ /// - Important: If any of the commands fail, the remaining commands will not execute
66
+ /// and the `EventLoopFuture` will fail.
67
+ /// - Returns: An `EventLoopFuture` that resolves the `RESPValue` responses,
68
+ /// in the same order as the command queue.
77
69
public func execute( ) -> EventLoopFuture < [ RESPValue ] > {
78
70
let response = EventLoopFuture< [ RESPValue] > . reduce(
79
71
into: [ ] ,
@@ -97,14 +89,21 @@ extension NIORedisPipeline: RedisPipeline {
97
89
}
98
90
}
99
91
100
- extension NIORedisPipeline : RedisClient {
101
- /// See `RedisCommandExecutor.eventLoop`.
102
- public var eventLoop : EventLoop { return self . channel. eventLoop }
92
+ // MARK: RedisClient
93
+
94
+ extension RedisPipeline : RedisClient {
95
+ /// See `RedisClient.eventLoop`
96
+ public var eventLoop : EventLoop { return channel. eventLoop }
103
97
104
98
/// Sends the command and arguments to a buffer to later be flushed when `execute()` is invoked.
105
- /// - Note: When working with a `NIORedisPipeline ` instance directly, it is preferred to use the
99
+ /// - Note: When working with a `RedisPipeline ` instance directly, it is preferred to use the
106
100
/// `RedisPipeline.enqueue(operation:)` method instead of `send(command:with:)`.
107
- public func send( command: String , with arguments: [ RESPValueConvertible ] = [ ] ) -> EventLoopFuture < RESPValue > {
101
+ ///
102
+ /// See `RedisClient.send(command:with:)`
103
+ public func send(
104
+ command: String ,
105
+ with arguments: [ RESPValueConvertible ] = [ ]
106
+ ) -> EventLoopFuture < RESPValue > {
108
107
let args = arguments. map { $0. convertedToRESPValue ( ) }
109
108
110
109
let promise = channel. eventLoop. makePromise ( of: RESPValue . self)
0 commit comments