@@ -32,16 +32,16 @@ public struct RedisCommandContext {
32
32
/// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses.
33
33
open class RedisCommandHandler {
34
34
/// Queue of promises waiting to receive a response value from a sent command.
35
- private var commandResponseQueue : [ EventLoopPromise < RESPValue > ]
35
+ private var commandResponseQueue : CircularBuffer < EventLoopPromise < RESPValue > >
36
36
private var logger : Logger
37
37
38
38
deinit {
39
39
guard commandResponseQueue. count > 0 else { return }
40
40
logger. warning ( " Command handler deinit when queue is not empty. Current size: \( commandResponseQueue. count) " )
41
41
}
42
42
43
- public init ( logger: Logger = Logger ( label: " RedisNIO.CommandHandler " ) ) {
44
- self . commandResponseQueue = [ ]
43
+ public init ( logger: Logger = Logger ( label: " RedisNIO.CommandHandler " ) , initialQueueCapacity : Int = 5 ) {
44
+ self . commandResponseQueue = CircularBuffer ( initialCapacity : initialQueueCapacity )
45
45
self . logger = logger
46
46
self . logger [ metadataKey: " CommandHandler " ] = " \( UUID ( ) ) "
47
47
}
@@ -58,30 +58,31 @@ extension RedisCommandHandler: ChannelInboundHandler {
58
58
///
59
59
/// See `ChannelInboundHandler.errorCaught(context:error:)`
60
60
public func errorCaught( context: ChannelHandlerContext , error: Error ) {
61
- guard let leadPromise = commandResponseQueue. last else {
62
- return assertionFailure ( " Received unexpected error while idle: \( error. localizedDescription) " )
63
- }
64
- leadPromise. fail ( error)
61
+ let queue = self . commandResponseQueue
62
+
63
+ assert ( queue. count > 0 , " Received unexpected error while idle: \( error. localizedDescription) " )
64
+
65
+ self . commandResponseQueue. removeAll ( )
66
+ queue. forEach { $0. fail ( error) }
67
+
68
+ logger. critical ( " Error in channel pipeline. " , metadata: [ " error " : . string( error. localizedDescription) ] )
69
+
65
70
context. fireErrorCaught ( error)
66
- RedisMetrics . commandFailureCount. increment ( )
67
71
}
68
72
69
73
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
70
74
/// `RESPValue` to the promise awaiting a response at the front of the queue.
71
75
///
72
76
/// See `ChannelInboundHandler.channelRead(context:data:)`
73
77
public func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
74
- let value = unwrapInboundIn ( data)
78
+ let value = self . unwrapInboundIn ( data)
75
79
76
- guard let leadPromise = commandResponseQueue. last else {
80
+ guard let leadPromise = self . commandResponseQueue. popFirst ( ) else {
77
81
assertionFailure ( " Read triggered with an empty promise queue! Ignoring: \( value) " )
78
82
logger. critical ( " Read triggered with no promise waiting in the queue! " )
79
83
return
80
84
}
81
85
82
- let popped = commandResponseQueue. popLast ( )
83
- assert ( popped != nil )
84
-
85
86
switch value {
86
87
case . error( let e) :
87
88
leadPromise. fail ( e)
@@ -108,8 +109,11 @@ extension RedisCommandHandler: ChannelOutboundHandler {
108
109
///
109
110
/// See `ChannelOutboundHandler.write(context:data:promise:)`
110
111
public func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
111
- let commandContext = unwrapOutboundIn ( data)
112
- commandResponseQueue. insert ( commandContext. responsePromise, at: 0 )
113
- context. write ( wrapOutboundOut ( commandContext. command) , promise: promise)
112
+ let commandContext = self . unwrapOutboundIn ( data)
113
+ self . commandResponseQueue. append ( commandContext. responsePromise)
114
+ context. write (
115
+ self . wrapOutboundOut ( commandContext. command) ,
116
+ promise: promise
117
+ )
114
118
}
115
119
}
0 commit comments