Skip to content

Commit 8bde6fd

Browse files
authored
Merge pull request #39 from Mordil/cleanup-handlers
Simplify RESP Parsing and Redis Channel Handlers
2 parents a8a1fa5 + 477668e commit 8bde6fd

11 files changed

+308
-290
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import NIO
2+
3+
/// Handles incoming byte messages from Redis
4+
/// and decodes them according to the Redis Serialization Protocol (RESP).
5+
///
6+
/// See `NIO.ByteToMessageDecoder`, `RESPTranslator` and [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
7+
public final class RedisByteDecoder: ByteToMessageDecoder {
8+
/// `ByteToMessageDecoder.InboundOut`
9+
public typealias InboundOut = RESPValue
10+
11+
/// See `ByteToMessageDecoder.decode(context:buffer:)`
12+
public func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
13+
var position = 0
14+
15+
switch try RESPTranslator.parseBytes(&buffer, fromIndex: &position) {
16+
case .incomplete: return .needMoreData
17+
case let .parsed(value):
18+
context.fireChannelRead(wrapInboundOut(value))
19+
buffer.moveReaderIndex(forwardBy: position)
20+
return .continue
21+
}
22+
}
23+
24+
/// See `ByteToMessageDecoder.decodeLast(context:buffer:seenEOF)`
25+
public func decodeLast(
26+
context: ChannelHandlerContext,
27+
buffer: inout ByteBuffer,
28+
seenEOF: Bool
29+
) throws -> DecodingState { return .needMoreData }
30+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import struct Logging.Logger
2+
import NIO
3+
4+
/// Encodes outgoing `RESPValue` data into a raw `ByteBuffer`
5+
/// according to the Redis Serialization Protocol (RESP).
6+
///
7+
/// See `NIO.MessageToByteEncoder`, `RESPTranslator`, and [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
8+
public final class RedisMessageEncoder: MessageToByteEncoder {
9+
/// See `MessageToByteEncoder.OutboundIn`
10+
public typealias OutboundIn = RESPValue
11+
12+
private let logger: Logger
13+
14+
public init(logger: Logger = Logger(label: "NIORedis.RedisMessageEncoder")) {
15+
self.logger = logger
16+
}
17+
18+
/// Encodes the `RedisValue` to bytes, following the RESP specification.
19+
///
20+
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol) and `RESPEncoder.encode(data:out:)`
21+
public func encode(data: RESPValue, out: inout ByteBuffer) throws {
22+
RESPTranslator.writeValue(data, into: &out)
23+
24+
logger.debug("Encoded \(data) to \(getPrintableString(for: &out))")
25+
}
26+
27+
// used only for debugging purposes where we build a formatted string for the encoded bytes
28+
private func getPrintableString(for buffer: inout ByteBuffer) -> String {
29+
return String(describing: buffer.getString(at: 0, length: buffer.readableBytes))
30+
.dropFirst(9)
31+
.dropLast()
32+
.description
33+
}
34+
}

Sources/NIORedis/RESP/RESPEncoder.swift

Lines changed: 0 additions & 71 deletions
This file was deleted.

Sources/NIORedis/RESP/RESPDecoder.swift renamed to Sources/NIORedis/RESP/RESPTranslator.swift

Lines changed: 91 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -11,51 +11,60 @@ extension UInt8 {
1111
static let colon: UInt8 = 0x3A
1212
}
1313

14-
/// Handles incoming byte messages from Redis and decodes them according to the RESP protocol.
14+
/// Provides methods for translating between byte streams and Swift types
15+
/// according to Redis Serialization Protocol (RESP).
1516
///
16-
/// See: [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
17-
public final class RESPDecoder {
18-
/// Representation of a `RESPDecoder.parse(at:from:) result, with either a decoded `RESPValue` or an indicator
19-
/// that the buffer contains an incomplete RESP message from the position provided.
20-
public enum ParsingState {
21-
case notYetParsed
17+
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
18+
public enum RESPTranslator { }
19+
20+
// MARK: From Bytes
21+
22+
extension RESPTranslator {
23+
/// Representation of the result of a parse attempt on a byte stream.
24+
/// - incomplete: The stream contains an incomplete RESP message from the position provided.
25+
/// - parsed: The parsed `RESPValue`
26+
public enum ParsingResult {
27+
case incomplete
2228
case parsed(RESPValue)
2329
}
2430

25-
/// Representation of an `Swift.Error` found during RESP decoding.
26-
public enum Error: LocalizedError {
31+
/// Representation of a `Swift.Error` found during RESP parsing.
32+
public enum ParsingError: LocalizedError {
2733
case invalidToken
2834
case arrayRecursion
2935

3036
public var errorDescription: String? {
31-
return "RESPDecoding: \(self)"
37+
return "RESPTranslator: \(self)"
3238
}
3339
}
3440

35-
public init() { }
36-
37-
/// Attempts to parse the `ByteBuffer`, starting at the specified position, following the RESP specification.
41+
/// Attempts to parse the `ByteBuffer`, starting at the specified position,
42+
/// following the RESP specification.
43+
/// - Important: As this par
3844
///
3945
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
4046
/// - Parameters:
41-
/// - buffer: The buffer that contains the bytes that need to be decoded.
42-
/// - position: The index of the buffer that should be considered the "front" to begin message parsing.
43-
public func parse(from buffer: inout ByteBuffer, index position: inout Int) throws -> ParsingState {
47+
/// - buffer: The buffer that contains the bytes that need to be parsed.
48+
/// - position: The index of the buffer that should contain the first byte of the message.
49+
public static func parseBytes(
50+
_ buffer: inout ByteBuffer,
51+
fromIndex position: inout Int
52+
) throws -> ParsingResult {
4453
let offset = position + buffer.readerIndex
4554
guard
4655
let token = buffer.viewBytes(at: offset, length: 1)?.first,
4756
var slice = buffer.getSlice(at: offset, length: buffer.readableBytes - position)
48-
else { return .notYetParsed }
57+
else { return .incomplete }
4958

5059
position += 1
5160

5261
switch token {
5362
case .plus:
54-
guard let result = parseSimpleString(&slice, &position) else { return .notYetParsed }
63+
guard let result = parseSimpleString(&slice, &position) else { return .incomplete }
5564
return .parsed(.simpleString(result))
5665

5766
case .colon:
58-
guard let value = parseInteger(&slice, &position) else { return .notYetParsed }
67+
guard let value = parseInteger(&slice, &position) else { return .incomplete }
5968
return .parsed(.integer(value))
6069

6170
case .dollar:
@@ -68,44 +77,15 @@ public final class RESPDecoder {
6877
guard
6978
let stringBuffer = parseSimpleString(&slice, &position),
7079
let message = stringBuffer.getString(at: 0, length: stringBuffer.readableBytes)
71-
else { return .notYetParsed }
80+
else { return .incomplete }
7281
return .parsed(.error(RedisError(reason: message)))
7382

74-
default: throw Error.invalidToken
75-
}
76-
}
77-
}
78-
79-
extension RESPDecoder: ByteToMessageDecoder {
80-
/// `ByteToMessageDecoder.InboundOut`
81-
public typealias InboundOut = RESPValue
82-
83-
/// See `ByteToMessageDecoder.decode(context:buffer:)`
84-
public func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
85-
var position = 0
86-
87-
switch try parse(from: &buffer, index: &position) {
88-
case .notYetParsed: return .needMoreData
89-
case let .parsed(value):
90-
context.fireChannelRead(wrapInboundOut(value))
91-
buffer.moveReaderIndex(forwardBy: position)
92-
return .continue
83+
default: throw ParsingError.invalidToken
9384
}
9485
}
9586

96-
/// See `ByteToMessageDecoder.decodeLast(context:buffer:seenEOF)`
97-
public func decodeLast(
98-
context: ChannelHandlerContext,
99-
buffer: inout ByteBuffer,
100-
seenEOF: Bool
101-
) throws -> DecodingState { return .needMoreData }
102-
}
103-
104-
// MARK: Parsing
105-
106-
extension RESPDecoder {
10787
/// See [https://redis.io/topics/protocol#resp-simple-strings](https://redis.io/topics/protocol#resp-simple-strings)
108-
func parseSimpleString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ByteBuffer? {
88+
static func parseSimpleString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ByteBuffer? {
10989
guard
11090
let bytes = buffer.viewBytes(at: position, length: buffer.readableBytes - position),
11191
let newlineIndex = bytes.firstIndex(of: .newline),
@@ -121,16 +101,16 @@ extension RESPDecoder {
121101
}
122102

123103
/// See [https://redis.io/topics/protocol#resp-integers](https://redis.io/topics/protocol#resp-integers)
124-
func parseInteger(_ buffer: inout ByteBuffer, _ position: inout Int) -> Int? {
104+
static func parseInteger(_ buffer: inout ByteBuffer, _ position: inout Int) -> Int? {
125105
guard let stringBuffer = parseSimpleString(&buffer, &position) else { return nil }
126106
return stringBuffer.withUnsafeReadableBytes { ptr in
127107
Int(strtoll(ptr.bindMemory(to: Int8.self).baseAddress!, nil, 10))
128108
}
129109
}
130110

131111
/// See [https://redis.io/topics/protocol#resp-bulk-strings](https://redis.io/topics/protocol#resp-bulk-strings)
132-
func parseBulkString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ParsingState {
133-
guard let size = parseInteger(&buffer, &position) else { return .notYetParsed }
112+
static func parseBulkString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ParsingResult {
113+
guard let size = parseInteger(&buffer, &position) else { return .incomplete }
134114

135115
// Redis sends '$-1\r\n' to represent a null bulk string
136116
guard size > -1 else { return .parsed(.null) }
@@ -140,7 +120,7 @@ extension RESPDecoder {
140120
// even if the content is empty, Redis send '$0\r\n\r\n'
141121
let readableByteCount = buffer.readableBytes - position
142122
let expectedRemainingMessageSize = size + 2
143-
guard readableByteCount >= expectedRemainingMessageSize else { return .notYetParsed }
123+
guard readableByteCount >= expectedRemainingMessageSize else { return .incomplete }
144124

145125
// empty bulk strings, different from null strings, are represented as .bulkString(nil)
146126
guard size > 0 else {
@@ -150,7 +130,7 @@ extension RESPDecoder {
150130
}
151131

152132
guard let bytes = buffer.viewBytes(at: position, length: expectedRemainingMessageSize) else {
153-
return .notYetParsed
133+
return .incomplete
154134
}
155135

156136
// move the parsing position to the newline for recursive parsing
@@ -162,31 +142,79 @@ extension RESPDecoder {
162142
}
163143

164144
/// See [https://redis.io/topics/protocol#resp-arrays](https://redis.io/topics/protocol#resp-arrays)
165-
func parseArray(_ buffer: inout ByteBuffer, _ position: inout Int) throws -> ParsingState {
166-
guard let elementCount = parseInteger(&buffer, &position) else { return .notYetParsed }
145+
static func parseArray(_ buffer: inout ByteBuffer, _ position: inout Int) throws -> ParsingResult {
146+
guard let elementCount = parseInteger(&buffer, &position) else { return .incomplete }
167147
guard elementCount > -1 else { return .parsed(.null) } // '*-1\r\n'
168148
guard elementCount > 0 else { return .parsed(.array([])) } // '*0\r\n'
169149

170-
var results = [ParsingState](repeating: .notYetParsed, count: elementCount)
150+
var results = [ParsingResult](repeating: .incomplete, count: elementCount)
171151
for index in 0..<elementCount {
172152
guard
173153
var slice = buffer.getSlice(at: position, length: buffer.readableBytes - position)
174-
else { return .notYetParsed }
154+
else { return .incomplete }
175155

176156
var subPosition = 0
177-
let result = try parse(from: &slice, index: &subPosition)
157+
let result = try parseBytes(&slice, fromIndex: &subPosition)
178158
switch result {
179159
case .parsed: results[index] = result
180-
default: return .notYetParsed
160+
default: return .incomplete
181161
}
182162

183163
position += subPosition
184164
}
185165

186166
let values = try results.map { state -> RESPValue in
187-
guard case let .parsed(value) = state else { throw Error.arrayRecursion }
167+
guard case let .parsed(value) = state else { throw ParsingError.arrayRecursion }
188168
return value
189169
}
190170
return .parsed(.array(ContiguousArray<RESPValue>(values)))
191171
}
192172
}
173+
174+
// MARK: To Bytes
175+
176+
extension RESPTranslator {
177+
/// Writes the `RESPValue` into the provided `ByteBuffer` following the RESP specification.
178+
///
179+
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
180+
/// - Parameters:
181+
/// - value: The value to write to the buffer.
182+
/// - out: The buffer being written to.
183+
public static func writeValue(_ value: RESPValue, into out: inout ByteBuffer) {
184+
switch value {
185+
case .simpleString(var buffer):
186+
out.writeStaticString("+")
187+
out.writeBuffer(&buffer)
188+
out.writeStaticString("\r\n")
189+
190+
case .bulkString(.some(var buffer)):
191+
out.writeStaticString("$")
192+
out.writeString(buffer.readableBytes.description)
193+
out.writeStaticString("\r\n")
194+
out.writeBuffer(&buffer)
195+
out.writeString("\r\n")
196+
197+
case .bulkString(.none):
198+
out.writeStaticString("$0\r\n\r\n")
199+
200+
case .integer(let number):
201+
out.writeStaticString(":")
202+
out.writeString(number.description)
203+
out.writeStaticString("\r\n")
204+
205+
case .null:
206+
out.writeStaticString("$-1\r\n")
207+
208+
case .error(let error):
209+
out.writeStaticString("-")
210+
out.writeString(error.message)
211+
out.writeStaticString("\r\n")
212+
213+
case .array(let array):
214+
out.writeStaticString("*")
215+
out.writeString(array.count.description)
216+
out.writeStaticString("\r\n")
217+
array.forEach { writeValue($0, into: &out) }
218+
}
219+
}
220+
}

0 commit comments

Comments
 (0)