Skip to content

Commit 8cdcffd

Browse files
committed
Add RedisCommandHandler to eventually replace RedisMessenger that works more deeply with NIO
1 parent 19cbadc commit 8cdcffd

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import Foundation
2+
import NIO
3+
4+
/// A context for `RedisCommandHandler` to operate within.
5+
public struct RedisCommandContext {
6+
/// A full command keyword and arguments stored as a single `RESPValue`.
7+
public let command: RESPValue
8+
/// A promise expected to be fulfilled with the `RESPValue` response to the command from Redis.
9+
public let promise: EventLoopPromise<RESPValue>
10+
}
11+
12+
/// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses.
13+
open class RedisCommandHandler {
14+
/// Queue of promises waiting to receive a response value from a sent command.
15+
private var commandResponseQueue: [EventLoopPromise<RESPValue>]
16+
17+
public init() {
18+
self.commandResponseQueue = []
19+
}
20+
}
21+
22+
// MARK: ChannelInboundHandler
23+
24+
extension RedisCommandHandler: ChannelInboundHandler {
25+
/// See `ChannelInboundHandler.InboundIn`
26+
public typealias InboundIn = RESPValue
27+
28+
/// Invoked by NIO when an error has been thrown. The command response promise at the front of the queue will be
29+
/// failed with the error.
30+
///
31+
/// See `ChannelInboundHandler.errorCaught(ctx:error:)`
32+
public func errorCaught(ctx: ChannelHandlerContext, error: Error) {
33+
guard let leadPromise = commandResponseQueue.last else {
34+
return assertionFailure("Received unexpected error while idle: \(error.localizedDescription)")
35+
}
36+
leadPromise.fail(error: error)
37+
}
38+
39+
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
40+
/// `RESPValue` to the promise awaiting a response at the front of the queue.
41+
///
42+
/// See `ChannelInboundHandler.channelRead(ctx:data:)`
43+
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
44+
let value = unwrapInboundIn(data)
45+
46+
guard let leadPromise = commandResponseQueue.last else {
47+
return assertionFailure("Read triggered with an empty promise queue! Ignoring: \(value)")
48+
}
49+
50+
let popped = commandResponseQueue.popLast()
51+
assert(popped != nil)
52+
53+
switch value {
54+
case .error(let e): leadPromise.fail(error: e)
55+
default: leadPromise.succeed(result: value)
56+
}
57+
}
58+
}
59+
60+
// MARK: ChannelOutboundHandler
61+
62+
extension RedisCommandHandler: ChannelOutboundHandler {
63+
/// See `ChannelOutboundHandler.OutboundIn`
64+
public typealias OutboundIn = RedisCommandContext
65+
/// See `ChannelOutboundHandler.OutboundOut`
66+
public typealias OutboundOut = RESPValue
67+
68+
/// Invoked by NIO when a `write` has been requested on the `Channel`.
69+
/// This unwraps a `RedisCommandContext`, retaining a callback to forward a response to later, and forwards
70+
/// the underlying command data further into the pipeline.
71+
///
72+
/// See `ChannelOutboundHandler.write(ctx:data:promise:)`
73+
public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
74+
let context = unwrapOutboundIn(data)
75+
commandResponseQueue.insert(context.promise, at: 0)
76+
ctx.write(wrapOutboundOut(context.command), promise: promise)
77+
}
78+
}

Sources/NIORedis/ChannelHandlers/RedisMessenger.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import NIO
22

33
/// `ChannelInboundHandler` that is responsible for coordinating incoming and outgoing messages on a particular
44
/// connection to Redis.
5+
@available(*, deprecated)
56
internal final class RedisMessenger {
67
private let eventLoop: EventLoop
78

0 commit comments

Comments
 (0)