Skip to content

Commit e0e5a68

Browse files
committed
Merge branch '53-circular-buffer' into 'master'
53 -- Use `CircularBuffer` for `RedisCommandHandler` queue Closes #53 See merge request Mordil/swift-redis-nio-client!59
2 parents 5672c55 + 88c6cfa commit e0e5a68

File tree

1 file changed

+20
-16
lines changed

1 file changed

+20
-16
lines changed

Sources/RedisNIO/ChannelHandlers/RedisCommandHandler.swift

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,16 @@ public struct RedisCommandContext {
3232
/// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses.
3333
open class RedisCommandHandler {
3434
/// Queue of promises waiting to receive a response value from a sent command.
35-
private var commandResponseQueue: [EventLoopPromise<RESPValue>]
35+
private var commandResponseQueue: CircularBuffer<EventLoopPromise<RESPValue>>
3636
private var logger: Logger
3737

3838
deinit {
3939
guard commandResponseQueue.count > 0 else { return }
4040
logger.warning("Command handler deinit when queue is not empty. Current size: \(commandResponseQueue.count)")
4141
}
4242

43-
public init(logger: Logger = Logger(label: "RedisNIO.CommandHandler")) {
44-
self.commandResponseQueue = []
43+
public init(logger: Logger = Logger(label: "RedisNIO.CommandHandler"), initialQueueCapacity: Int = 5) {
44+
self.commandResponseQueue = CircularBuffer(initialCapacity: initialQueueCapacity)
4545
self.logger = logger
4646
self.logger[metadataKey: "CommandHandler"] = "\(UUID())"
4747
}
@@ -58,30 +58,31 @@ extension RedisCommandHandler: ChannelInboundHandler {
5858
///
5959
/// See `ChannelInboundHandler.errorCaught(context:error:)`
6060
public func errorCaught(context: ChannelHandlerContext, error: Error) {
61-
guard let leadPromise = commandResponseQueue.last else {
62-
return assertionFailure("Received unexpected error while idle: \(error.localizedDescription)")
63-
}
64-
leadPromise.fail(error)
61+
let queue = self.commandResponseQueue
62+
63+
assert(queue.count > 0, "Received unexpected error while idle: \(error.localizedDescription)")
64+
65+
self.commandResponseQueue.removeAll()
66+
queue.forEach { $0.fail(error) }
67+
68+
logger.critical("Error in channel pipeline.", metadata: ["error": .string(error.localizedDescription)])
69+
6570
context.fireErrorCaught(error)
66-
RedisMetrics.commandFailureCount.increment()
6771
}
6872

6973
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
7074
/// `RESPValue` to the promise awaiting a response at the front of the queue.
7175
///
7276
/// See `ChannelInboundHandler.channelRead(context:data:)`
7377
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
74-
let value = unwrapInboundIn(data)
78+
let value = self.unwrapInboundIn(data)
7579

76-
guard let leadPromise = commandResponseQueue.last else {
80+
guard let leadPromise = self.commandResponseQueue.popFirst() else {
7781
assertionFailure("Read triggered with an empty promise queue! Ignoring: \(value)")
7882
logger.critical("Read triggered with no promise waiting in the queue!")
7983
return
8084
}
8185

82-
let popped = commandResponseQueue.popLast()
83-
assert(popped != nil)
84-
8586
switch value {
8687
case .error(let e):
8788
leadPromise.fail(e)
@@ -108,8 +109,11 @@ extension RedisCommandHandler: ChannelOutboundHandler {
108109
///
109110
/// See `ChannelOutboundHandler.write(context:data:promise:)`
110111
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
111-
let commandContext = unwrapOutboundIn(data)
112-
commandResponseQueue.insert(commandContext.responsePromise, at: 0)
113-
context.write(wrapOutboundOut(commandContext.command), promise: promise)
112+
let commandContext = self.unwrapOutboundIn(data)
113+
self.commandResponseQueue.append(commandContext.responsePromise)
114+
context.write(
115+
self.wrapOutboundOut(commandContext.command),
116+
promise: promise
117+
)
114118
}
115119
}

0 commit comments

Comments
 (0)