Skip to content

Commit 477668e

Browse files
committed
Simplify RESP Parsing and Redis Channel Handlers
Motivation: As it stands, the parsing / encoding implementations for RESP was directly tied to the NIO Channel Handlers. Unit tests were sloppily spread across multiple files and it made it difficult to explicitly separate out the Channel Handler behavior from the RESP specification implementation. Modifications: - Add: `RESPTranslator` enum helper object with static methods that only handles RESP parsing / encoding - Rename: `RESPEncoder` to `RedisMessageEncoder` - Rename: `RESPDecoder` to `RedisByteDecoder` Results: It should be easier to understand what type is intended to be used as part of a NIO channel pipeline while still having a direct way of parsing / encoding between Swift types and the RESP specification. Unit tests should be more maintainable as well.
1 parent a8a1fa5 commit 477668e

11 files changed

+308
-290
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import NIO
2+
3+
/// Handles incoming byte messages from Redis
4+
/// and decodes them according to the Redis Serialization Protocol (RESP).
5+
///
6+
/// See `NIO.ByteToMessageDecoder`, `RESPTranslator` and [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
7+
public final class RedisByteDecoder: ByteToMessageDecoder {
8+
/// `ByteToMessageDecoder.InboundOut`
9+
public typealias InboundOut = RESPValue
10+
11+
/// See `ByteToMessageDecoder.decode(context:buffer:)`
12+
public func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
13+
var position = 0
14+
15+
switch try RESPTranslator.parseBytes(&buffer, fromIndex: &position) {
16+
case .incomplete: return .needMoreData
17+
case let .parsed(value):
18+
context.fireChannelRead(wrapInboundOut(value))
19+
buffer.moveReaderIndex(forwardBy: position)
20+
return .continue
21+
}
22+
}
23+
24+
/// See `ByteToMessageDecoder.decodeLast(context:buffer:seenEOF)`
25+
public func decodeLast(
26+
context: ChannelHandlerContext,
27+
buffer: inout ByteBuffer,
28+
seenEOF: Bool
29+
) throws -> DecodingState { return .needMoreData }
30+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import struct Logging.Logger
2+
import NIO
3+
4+
/// Encodes outgoing `RESPValue` data into a raw `ByteBuffer`
5+
/// according to the Redis Serialization Protocol (RESP).
6+
///
7+
/// See `NIO.MessageToByteEncoder`, `RESPTranslator`, and [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
8+
public final class RedisMessageEncoder: MessageToByteEncoder {
9+
/// See `MessageToByteEncoder.OutboundIn`
10+
public typealias OutboundIn = RESPValue
11+
12+
private let logger: Logger
13+
14+
public init(logger: Logger = Logger(label: "NIORedis.RedisMessageEncoder")) {
15+
self.logger = logger
16+
}
17+
18+
/// Encodes the `RedisValue` to bytes, following the RESP specification.
19+
///
20+
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol) and `RESPEncoder.encode(data:out:)`
21+
public func encode(data: RESPValue, out: inout ByteBuffer) throws {
22+
RESPTranslator.writeValue(data, into: &out)
23+
24+
logger.debug("Encoded \(data) to \(getPrintableString(for: &out))")
25+
}
26+
27+
// used only for debugging purposes where we build a formatted string for the encoded bytes
28+
private func getPrintableString(for buffer: inout ByteBuffer) -> String {
29+
return String(describing: buffer.getString(at: 0, length: buffer.readableBytes))
30+
.dropFirst(9)
31+
.dropLast()
32+
.description
33+
}
34+
}

Sources/NIORedis/RESP/RESPEncoder.swift

Lines changed: 0 additions & 71 deletions
This file was deleted.

Sources/NIORedis/RESP/RESPDecoder.swift renamed to Sources/NIORedis/RESP/RESPTranslator.swift

Lines changed: 91 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -11,51 +11,60 @@ extension UInt8 {
1111
static let colon: UInt8 = 0x3A
1212
}
1313

14-
/// Handles incoming byte messages from Redis and decodes them according to the RESP protocol.
14+
/// Provides methods for translating between byte streams and Swift types
15+
/// according to Redis Serialization Protocol (RESP).
1516
///
16-
/// See: [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
17-
public final class RESPDecoder {
18-
/// Representation of a `RESPDecoder.parse(at:from:) result, with either a decoded `RESPValue` or an indicator
19-
/// that the buffer contains an incomplete RESP message from the position provided.
20-
public enum ParsingState {
21-
case notYetParsed
17+
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
18+
public enum RESPTranslator { }
19+
20+
// MARK: From Bytes
21+
22+
extension RESPTranslator {
23+
/// Representation of the result of a parse attempt on a byte stream.
24+
/// - incomplete: The stream contains an incomplete RESP message from the position provided.
25+
/// - parsed: The parsed `RESPValue`
26+
public enum ParsingResult {
27+
case incomplete
2228
case parsed(RESPValue)
2329
}
2430

25-
/// Representation of an `Swift.Error` found during RESP decoding.
26-
public enum Error: LocalizedError {
31+
/// Representation of a `Swift.Error` found during RESP parsing.
32+
public enum ParsingError: LocalizedError {
2733
case invalidToken
2834
case arrayRecursion
2935

3036
public var errorDescription: String? {
31-
return "RESPDecoding: \(self)"
37+
return "RESPTranslator: \(self)"
3238
}
3339
}
3440

35-
public init() { }
36-
37-
/// Attempts to parse the `ByteBuffer`, starting at the specified position, following the RESP specification.
41+
/// Attempts to parse the `ByteBuffer`, starting at the specified position,
42+
/// following the RESP specification.
43+
/// - Important: As this par
3844
///
3945
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
4046
/// - Parameters:
41-
/// - buffer: The buffer that contains the bytes that need to be decoded.
42-
/// - position: The index of the buffer that should be considered the "front" to begin message parsing.
43-
public func parse(from buffer: inout ByteBuffer, index position: inout Int) throws -> ParsingState {
47+
/// - buffer: The buffer that contains the bytes that need to be parsed.
48+
/// - position: The index of the buffer that should contain the first byte of the message.
49+
public static func parseBytes(
50+
_ buffer: inout ByteBuffer,
51+
fromIndex position: inout Int
52+
) throws -> ParsingResult {
4453
let offset = position + buffer.readerIndex
4554
guard
4655
let token = buffer.viewBytes(at: offset, length: 1)?.first,
4756
var slice = buffer.getSlice(at: offset, length: buffer.readableBytes - position)
48-
else { return .notYetParsed }
57+
else { return .incomplete }
4958

5059
position += 1
5160

5261
switch token {
5362
case .plus:
54-
guard let result = parseSimpleString(&slice, &position) else { return .notYetParsed }
63+
guard let result = parseSimpleString(&slice, &position) else { return .incomplete }
5564
return .parsed(.simpleString(result))
5665

5766
case .colon:
58-
guard let value = parseInteger(&slice, &position) else { return .notYetParsed }
67+
guard let value = parseInteger(&slice, &position) else { return .incomplete }
5968
return .parsed(.integer(value))
6069

6170
case .dollar:
@@ -68,44 +77,15 @@ public final class RESPDecoder {
6877
guard
6978
let stringBuffer = parseSimpleString(&slice, &position),
7079
let message = stringBuffer.getString(at: 0, length: stringBuffer.readableBytes)
71-
else { return .notYetParsed }
80+
else { return .incomplete }
7281
return .parsed(.error(RedisError(reason: message)))
7382

74-
default: throw Error.invalidToken
75-
}
76-
}
77-
}
78-
79-
extension RESPDecoder: ByteToMessageDecoder {
80-
/// `ByteToMessageDecoder.InboundOut`
81-
public typealias InboundOut = RESPValue
82-
83-
/// See `ByteToMessageDecoder.decode(context:buffer:)`
84-
public func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
85-
var position = 0
86-
87-
switch try parse(from: &buffer, index: &position) {
88-
case .notYetParsed: return .needMoreData
89-
case let .parsed(value):
90-
context.fireChannelRead(wrapInboundOut(value))
91-
buffer.moveReaderIndex(forwardBy: position)
92-
return .continue
83+
default: throw ParsingError.invalidToken
9384
}
9485
}
9586

96-
/// See `ByteToMessageDecoder.decodeLast(context:buffer:seenEOF)`
97-
public func decodeLast(
98-
context: ChannelHandlerContext,
99-
buffer: inout ByteBuffer,
100-
seenEOF: Bool
101-
) throws -> DecodingState { return .needMoreData }
102-
}
103-
104-
// MARK: Parsing
105-
106-
extension RESPDecoder {
10787
/// See [https://redis.io/topics/protocol#resp-simple-strings](https://redis.io/topics/protocol#resp-simple-strings)
108-
func parseSimpleString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ByteBuffer? {
88+
static func parseSimpleString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ByteBuffer? {
10989
guard
11090
let bytes = buffer.viewBytes(at: position, length: buffer.readableBytes - position),
11191
let newlineIndex = bytes.firstIndex(of: .newline),
@@ -121,16 +101,16 @@ extension RESPDecoder {
121101
}
122102

123103
/// See [https://redis.io/topics/protocol#resp-integers](https://redis.io/topics/protocol#resp-integers)
124-
func parseInteger(_ buffer: inout ByteBuffer, _ position: inout Int) -> Int? {
104+
static func parseInteger(_ buffer: inout ByteBuffer, _ position: inout Int) -> Int? {
125105
guard let stringBuffer = parseSimpleString(&buffer, &position) else { return nil }
126106
return stringBuffer.withUnsafeReadableBytes { ptr in
127107
Int(strtoll(ptr.bindMemory(to: Int8.self).baseAddress!, nil, 10))
128108
}
129109
}
130110

131111
/// See [https://redis.io/topics/protocol#resp-bulk-strings](https://redis.io/topics/protocol#resp-bulk-strings)
132-
func parseBulkString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ParsingState {
133-
guard let size = parseInteger(&buffer, &position) else { return .notYetParsed }
112+
static func parseBulkString(_ buffer: inout ByteBuffer, _ position: inout Int) -> ParsingResult {
113+
guard let size = parseInteger(&buffer, &position) else { return .incomplete }
134114

135115
// Redis sends '$-1\r\n' to represent a null bulk string
136116
guard size > -1 else { return .parsed(.null) }
@@ -140,7 +120,7 @@ extension RESPDecoder {
140120
// even if the content is empty, Redis send '$0\r\n\r\n'
141121
let readableByteCount = buffer.readableBytes - position
142122
let expectedRemainingMessageSize = size + 2
143-
guard readableByteCount >= expectedRemainingMessageSize else { return .notYetParsed }
123+
guard readableByteCount >= expectedRemainingMessageSize else { return .incomplete }
144124

145125
// empty bulk strings, different from null strings, are represented as .bulkString(nil)
146126
guard size > 0 else {
@@ -150,7 +130,7 @@ extension RESPDecoder {
150130
}
151131

152132
guard let bytes = buffer.viewBytes(at: position, length: expectedRemainingMessageSize) else {
153-
return .notYetParsed
133+
return .incomplete
154134
}
155135

156136
// move the parsing position to the newline for recursive parsing
@@ -162,31 +142,79 @@ extension RESPDecoder {
162142
}
163143

164144
/// See [https://redis.io/topics/protocol#resp-arrays](https://redis.io/topics/protocol#resp-arrays)
165-
func parseArray(_ buffer: inout ByteBuffer, _ position: inout Int) throws -> ParsingState {
166-
guard let elementCount = parseInteger(&buffer, &position) else { return .notYetParsed }
145+
static func parseArray(_ buffer: inout ByteBuffer, _ position: inout Int) throws -> ParsingResult {
146+
guard let elementCount = parseInteger(&buffer, &position) else { return .incomplete }
167147
guard elementCount > -1 else { return .parsed(.null) } // '*-1\r\n'
168148
guard elementCount > 0 else { return .parsed(.array([])) } // '*0\r\n'
169149

170-
var results = [ParsingState](repeating: .notYetParsed, count: elementCount)
150+
var results = [ParsingResult](repeating: .incomplete, count: elementCount)
171151
for index in 0..<elementCount {
172152
guard
173153
var slice = buffer.getSlice(at: position, length: buffer.readableBytes - position)
174-
else { return .notYetParsed }
154+
else { return .incomplete }
175155

176156
var subPosition = 0
177-
let result = try parse(from: &slice, index: &subPosition)
157+
let result = try parseBytes(&slice, fromIndex: &subPosition)
178158
switch result {
179159
case .parsed: results[index] = result
180-
default: return .notYetParsed
160+
default: return .incomplete
181161
}
182162

183163
position += subPosition
184164
}
185165

186166
let values = try results.map { state -> RESPValue in
187-
guard case let .parsed(value) = state else { throw Error.arrayRecursion }
167+
guard case let .parsed(value) = state else { throw ParsingError.arrayRecursion }
188168
return value
189169
}
190170
return .parsed(.array(ContiguousArray<RESPValue>(values)))
191171
}
192172
}
173+
174+
// MARK: To Bytes
175+
176+
extension RESPTranslator {
177+
/// Writes the `RESPValue` into the provided `ByteBuffer` following the RESP specification.
178+
///
179+
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
180+
/// - Parameters:
181+
/// - value: The value to write to the buffer.
182+
/// - out: The buffer being written to.
183+
public static func writeValue(_ value: RESPValue, into out: inout ByteBuffer) {
184+
switch value {
185+
case .simpleString(var buffer):
186+
out.writeStaticString("+")
187+
out.writeBuffer(&buffer)
188+
out.writeStaticString("\r\n")
189+
190+
case .bulkString(.some(var buffer)):
191+
out.writeStaticString("$")
192+
out.writeString(buffer.readableBytes.description)
193+
out.writeStaticString("\r\n")
194+
out.writeBuffer(&buffer)
195+
out.writeString("\r\n")
196+
197+
case .bulkString(.none):
198+
out.writeStaticString("$0\r\n\r\n")
199+
200+
case .integer(let number):
201+
out.writeStaticString(":")
202+
out.writeString(number.description)
203+
out.writeStaticString("\r\n")
204+
205+
case .null:
206+
out.writeStaticString("$-1\r\n")
207+
208+
case .error(let error):
209+
out.writeStaticString("-")
210+
out.writeString(error.message)
211+
out.writeStaticString("\r\n")
212+
213+
case .array(let array):
214+
out.writeStaticString("*")
215+
out.writeString(array.count.description)
216+
out.writeStaticString("\r\n")
217+
array.forEach { writeValue($0, into: &out) }
218+
}
219+
}
220+
}

0 commit comments

Comments
 (0)