@@ -16,11 +16,15 @@ import struct Foundation.UUID
16
16
import Logging
17
17
import NIO
18
18
19
- /// A context for `RedisCommandHandler` to operate within.
20
- public struct RedisCommandContext {
21
- /// A full command keyword and arguments stored as a single `RESPValue`.
19
+ /// The `NIO.ChannelOutboundHandler.OutboundIn` type for `RedisCommandHandler`.
20
+ ///
21
+ /// This holds the command and its arguments stored as a single `RESPValue` to be sent to Redis,
22
+ /// and an `NIO.EventLoopPromise` to be fulfilled when a response has been received.
23
+ /// - Important: This struct has _reference semantics_ due to the retention of the `NIO.EventLoopPromise`.
24
+ public struct RedisCommand {
25
+ /// A command keyword and its arguments stored as a single `RESPValue.array`.
22
26
public let command : RESPValue
23
- /// A promise expected to be fulfilled with the `RESPValue` response to the command from Redis.
27
+ /// A promise to be fulfilled with the sent command's response from Redis.
24
28
public let responsePromise : EventLoopPromise < RESPValue >
25
29
26
30
public init ( command: RESPValue , promise: EventLoopPromise < RESPValue > ) {
@@ -29,18 +33,28 @@ public struct RedisCommandContext {
29
33
}
30
34
}
31
35
32
- /// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses.
33
- open class RedisCommandHandler {
34
- /// Queue of promises waiting to receive a response value from a sent command.
36
+ /// An object that operates in a First In, First Out (FIFO) request-response cycle.
37
+ ///
38
+ /// `RedisCommandHandler` is a `NIO.ChannelDuplexHandler` that sends `RedisCommand` instances to Redis,
39
+ /// and fulfills the command's `NIO.EventLoopPromise` as soon as a `RESPValue` response has been received from Redis.
40
+ public final class RedisCommandHandler {
41
+ /// FIFO queue of promises waiting to receive a response value from a sent command.
35
42
private var commandResponseQueue : CircularBuffer < EventLoopPromise < RESPValue > >
36
43
private var logger : Logger
37
44
38
45
deinit {
39
- guard commandResponseQueue. count > 0 else { return }
40
- logger. warning ( " Command handler deinit when queue is not empty. Current size: \( commandResponseQueue. count) " )
46
+ guard self . commandResponseQueue. count > 0 else { return }
47
+ self . logger [ metadataKey: " Queue Size " ] = " \( self . commandResponseQueue. count) "
48
+ self . logger. warning ( " Command handler deinit when queue is not empty " )
41
49
}
42
50
43
- public init ( logger: Logger = Logger ( label: " RedisNIO.CommandHandler " ) , initialQueueCapacity: Int = 5 ) {
51
+ /// - Parameters:
52
+ /// - initialQueueCapacity: The initial queue size to start with. The default is `3`. `RedisCommandHandler` stores all
53
+ /// `RedisCommand.responsePromise` objects into a buffer, and unless you intend to execute several concurrent commands against Redis,
54
+ /// and don't want the buffer to resize, you shouldn't need to set this parameter.
55
+ /// - logger: The `Logging.Logger` instance to use.
56
+ /// The logger will have a `Foundation.UUID` value attached as metadata to uniquely identify this instance.
57
+ public init ( initialQueueCapacity: Int = 3 , logger: Logger = Logger ( label: " RedisNIO.CommandHandler " ) ) {
44
58
self . commandResponseQueue = CircularBuffer ( initialCapacity: initialQueueCapacity)
45
59
self . logger = logger
46
60
self . logger [ metadataKey: " CommandHandler " ] = " \( UUID ( ) ) "
@@ -50,13 +64,16 @@ open class RedisCommandHandler {
50
64
// MARK: ChannelInboundHandler
51
65
52
66
extension RedisCommandHandler : ChannelInboundHandler {
53
- /// See `ChannelInboundHandler.InboundIn`
67
+ /// See `NIO. ChannelInboundHandler.InboundIn`
54
68
public typealias InboundIn = RESPValue
55
69
56
- /// Invoked by NIO when an error has been thrown. The command response promise at the front of the queue will be
57
- /// failed with the error.
70
+ /// Invoked by SwiftNIO when an error has been thrown. The command queue will be drained, with each promise in the queue being failed with the error thrown.
58
71
///
59
- /// See `ChannelInboundHandler.errorCaught(context:error:)`
72
+ /// See `NIO.ChannelInboundHandler.errorCaught(context:error:)`
73
+ /// - Important: This will also close the socket connection to Redis.
74
+ /// - Note:`RedisMetrics.commandFailureCount` is **not** incremented from this error.
75
+ ///
76
+ /// A `Logging.LogLevel.critical` message will be written with the caught error.
60
77
public func errorCaught( context: ChannelHandlerContext , error: Error ) {
61
78
let queue = self . commandResponseQueue
62
79
@@ -65,21 +82,22 @@ extension RedisCommandHandler: ChannelInboundHandler {
65
82
self . commandResponseQueue. removeAll ( )
66
83
queue. forEach { $0. fail ( error) }
67
84
68
- logger. critical ( " Error in channel pipeline. " , metadata: [ " error " : . string ( error. localizedDescription) ] )
69
-
70
- context. fireErrorCaught ( error )
85
+ self . logger. critical ( " Error in channel pipeline. " , metadata: [ " error " : " \ ( error. localizedDescription) " ] )
86
+
87
+ context. close ( promise : nil )
71
88
}
72
89
73
- /// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
74
- /// `RESPValue` to the promise awaiting a response at the front of the queue.
90
+ /// Invoked by SwiftNIO when a read has been fired from earlier in the response chain.
91
+ /// This forwards the decoded `RESPValue` response message to the promise waiting to be fulfilled at the front of the command queue.
92
+ /// - Note: `RedisMetrics.commandFailureCount` and `RedisMetrics.commandSuccessCount` are incremented from this method.
75
93
///
76
- /// See `ChannelInboundHandler.channelRead(context:data:)`
94
+ /// See `NIO. ChannelInboundHandler.channelRead(context:data:)`
77
95
public func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
78
96
let value = self . unwrapInboundIn ( data)
79
97
80
98
guard let leadPromise = self . commandResponseQueue. popFirst ( ) else {
81
99
assertionFailure ( " Read triggered with an empty promise queue! Ignoring: \( value) " )
82
- logger. critical ( " Read triggered with no promise waiting in the queue! " )
100
+ self . logger. critical ( " Read triggered with no promise waiting in the queue! " )
83
101
return
84
102
}
85
103
@@ -98,16 +116,16 @@ extension RedisCommandHandler: ChannelInboundHandler {
98
116
// MARK: ChannelOutboundHandler
99
117
100
118
extension RedisCommandHandler : ChannelOutboundHandler {
101
- /// See `ChannelOutboundHandler.OutboundIn`
102
- public typealias OutboundIn = RedisCommandContext
103
- /// See `ChannelOutboundHandler.OutboundOut`
119
+ /// See `NIO. ChannelOutboundHandler.OutboundIn`
120
+ public typealias OutboundIn = RedisCommand
121
+ /// See `NIO. ChannelOutboundHandler.OutboundOut`
104
122
public typealias OutboundOut = RESPValue
105
123
106
- /// Invoked by NIO when a `write` has been requested on the `Channel`.
107
- /// This unwraps a `RedisCommandContext `, retaining a callback to forward a response to later, and forwards
108
- /// the underlying command data further into the pipeline .
124
+ /// Invoked by SwiftNIO when a `write` has been requested on the `Channel`.
125
+ /// This unwraps a `RedisCommand `, storing the `NIO.EventLoopPromise` in a command queue,
126
+ /// to fulfill later with the response to the command that is about to be sent through the `NIO.Channel` .
109
127
///
110
- /// See `ChannelOutboundHandler.write(context:data:promise:)`
128
+ /// See `NIO. ChannelOutboundHandler.write(context:data:promise:)`
111
129
public func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
112
130
let commandContext = self . unwrapOutboundIn ( data)
113
131
self . commandResponseQueue. append ( commandContext. responsePromise)
0 commit comments