Skip to content

Commit b759863

Browse files
committed
Add RedisMessenger for handling read/write queues to Redis
1 parent a2753da commit b759863

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import NIO
2+
3+
/// `ChannelInboundHandler` that handles the responsibility of coordinating incoming and outgoing messages
4+
/// on a particular connection to Redis.
5+
internal final class RedisMessenger: ChannelInboundHandler {
6+
/// See `ChannelInboundHandler.InboundIn`
7+
public typealias InboundIn = RedisData
8+
9+
/// See `ChannelInboundHandler.OutboundOut`
10+
public typealias OutboundOut = RedisData
11+
12+
/// Queue of promises waiting to receive an incoming response value from a outgoing message.
13+
private var waitingResponseQueue: [EventLoopPromise<InboundIn>]
14+
/// Queue of unsent outgoing messages, with the oldest objects at the end of the array.
15+
private var outgoingMessageQueue: [OutboundOut]
16+
17+
/// This handler's event loop.
18+
private let eventLoop: EventLoop
19+
20+
/// Context used for writing outgoing messages with.
21+
private weak var outputContext: ChannelHandlerContext?
22+
23+
/// Creates a new handler that works on the specified `EventLoop`.
24+
public init(on eventLoop: EventLoop) {
25+
self.waitingResponseQueue = []
26+
self.outgoingMessageQueue = []
27+
self.eventLoop = eventLoop
28+
}
29+
30+
/// See `ChannelInboundHandler.channelActive(ctx:)`
31+
public func channelActive(ctx: ChannelHandlerContext) {
32+
outputContext = ctx
33+
_flushOutgoingQueue()
34+
}
35+
36+
/// See `ChannelInboundHandler.errorCaught(ctx:error:)`
37+
public func errorCaught(ctx: ChannelHandlerContext, error: Error) {
38+
guard let leadPromise = waitingResponseQueue.last else {
39+
return assertionFailure("Received unexpected error while idle: \(error.localizedDescription)")
40+
}
41+
leadPromise.fail(error: error)
42+
}
43+
44+
/// See `ChannelInboundHandler.channelRead(ctx:data:)`
45+
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
46+
let input = unwrapInboundIn(data)
47+
48+
guard let leadPromise = waitingResponseQueue.last else {
49+
return assertionFailure("Read triggered with an empty input queue! Ignoring: \(input)")
50+
}
51+
52+
let popped = waitingResponseQueue.popLast()
53+
assert(popped != nil)
54+
55+
leadPromise.succeed(result: input)
56+
}
57+
58+
/// Adds a complete message encoded as `RedisData` to the queue and returns an `EventLoopFuture` that resolves
59+
/// the response from Redis.
60+
func enqueue(_ output: OutboundOut) -> EventLoopFuture<InboundIn> {
61+
// ensure that we are on the event loop before modifying our data
62+
guard eventLoop.inEventLoop else {
63+
return eventLoop.submit { }.then { return self.enqueue(output) }
64+
}
65+
66+
// add the new output to the writing queue at the front
67+
outgoingMessageQueue.insert(output, at: 0)
68+
69+
// every outgoing message is expected to receive some form of response, so build
70+
// the context in the readQueue that we resolve with the response
71+
let promise = eventLoop.makePromise(of: InboundIn.self)
72+
waitingResponseQueue.insert(promise, at: 0)
73+
74+
// if we have a context for writing, flush the outgoing queue
75+
outputContext?.eventLoop.execute {
76+
self._flushOutgoingQueue()
77+
}
78+
79+
return promise.futureResult
80+
}
81+
82+
/// Writes all queued outgoing messages to the channel.
83+
func _flushOutgoingQueue() {
84+
guard let context = outputContext else { return }
85+
86+
while let output = outgoingMessageQueue.popLast() {
87+
context.write(wrapOutboundOut(output), promise: nil)
88+
context.flush()
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)