Skip to content

Commit a6802a8

Browse files
authored
Add GRPCMessageDeframer (#1776)
1 parent be8e15d commit a6802a8

File tree

3 files changed

+324
-0
lines changed

3 files changed

+324
-0
lines changed

Package.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ extension Target.Dependency {
128128
name: "NIOTransportServices",
129129
package: "swift-nio-transport-services"
130130
)
131+
static let nioTestUtils: Self = .product(name: "NIOTestUtils", package: "swift-nio")
131132
static let logging: Self = .product(name: "Logging", package: "swift-log")
132133
static let protobuf: Self = .product(name: "SwiftProtobuf", package: "swift-protobuf")
133134
static let protobufPluginLibrary: Self = .product(
@@ -312,6 +313,7 @@ extension Target {
312313
.nioCore,
313314
.nioHTTP2,
314315
.nioEmbedded,
316+
.nioTestUtils,
315317
]
316318
)
317319

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import GRPCCore
18+
import NIOCore
19+
20+
/// A ``GRPCMessageDeframer`` helps with the deframing of gRPC data frames:
21+
/// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length
22+
/// - It reads and decompresses the payload, if compressed
23+
/// - It helps put together frames that have been split across multiple `ByteBuffers` by the underlying transport
24+
struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder {
25+
/// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
26+
static let metadataLength = 5
27+
static let defaultMaximumPayloadSize = Int.max
28+
29+
typealias InboundOut = [UInt8]
30+
31+
private let decompressor: Zlib.Decompressor?
32+
private let maximumPayloadSize: Int
33+
34+
/// Create a new ``GRPCMessageDeframer``.
35+
/// - Parameters:
36+
/// - maximumPayloadSize: The maximum size a message payload can be.
37+
/// - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages.
38+
/// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean
39+
/// up any resources allocated by `Zlib`.
40+
init(
41+
maximumPayloadSize: Int = Self.defaultMaximumPayloadSize,
42+
decompressor: Zlib.Decompressor? = nil
43+
) {
44+
self.maximumPayloadSize = maximumPayloadSize
45+
self.decompressor = decompressor
46+
}
47+
48+
mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
49+
guard buffer.readableBytes >= Self.metadataLength else {
50+
// If we cannot read enough bytes to cover the metadata's length, then we
51+
// need to wait for more bytes to become available to us.
52+
return nil
53+
}
54+
55+
// Store the current reader index in case we don't yet have enough
56+
// bytes in the buffer to decode a full frame, and need to reset it.
57+
// The force-unwraps for the compression flag and message length are safe,
58+
// because we've checked just above that we've got at least enough bytes to
59+
// read all of the metadata.
60+
let originalReaderIndex = buffer.readerIndex
61+
let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1
62+
let messageLength = buffer.readInteger(as: UInt32.self)!
63+
64+
if messageLength > self.maximumPayloadSize {
65+
throw RPCError(
66+
code: .resourceExhausted,
67+
message: """
68+
Message has exceeded the configured maximum payload size \
69+
(max: \(self.maximumPayloadSize), actual: \(messageLength))
70+
"""
71+
)
72+
}
73+
74+
guard var message = buffer.readSlice(length: Int(messageLength)) else {
75+
// `ByteBuffer/readSlice(length:)` returns nil when there are not enough
76+
// bytes to read the requested length. This can happen if we don't yet have
77+
// enough bytes buffered to read the full message payload.
78+
// By reading the metadata though, we have already moved the reader index,
79+
// so we must reset it to its previous, original position for now,
80+
// and return. We'll try decoding again, once more bytes become available
81+
// in our buffer.
82+
buffer.moveReaderIndex(to: originalReaderIndex)
83+
return nil
84+
}
85+
86+
if isMessageCompressed {
87+
guard let decompressor = self.decompressor else {
88+
// We cannot decompress the payload - throw an error.
89+
throw RPCError(
90+
code: .internalError,
91+
message: "Received a compressed message payload, but no decompressor has been configured."
92+
)
93+
}
94+
return try decompressor.decompress(&message, limit: self.maximumPayloadSize)
95+
} else {
96+
return Array(buffer: message)
97+
}
98+
}
99+
100+
mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
101+
try self.decode(buffer: &buffer)
102+
}
103+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import GRPCCore
18+
import NIOCore
19+
import NIOTestUtils
20+
import XCTest
21+
22+
@testable import GRPCHTTP2Core
23+
24+
final class GRPCMessageDeframerTests: XCTestCase {
25+
func testReadMultipleMessagesWithoutCompression() throws {
26+
let firstMessage = {
27+
var buffer = ByteBuffer()
28+
buffer.writeInteger(UInt8(0))
29+
buffer.writeInteger(UInt32(16))
30+
buffer.writeRepeatingByte(42, count: 16)
31+
return buffer
32+
}()
33+
34+
let secondMessage = {
35+
var buffer = ByteBuffer()
36+
buffer.writeInteger(UInt8(0))
37+
buffer.writeInteger(UInt32(8))
38+
buffer.writeRepeatingByte(43, count: 8)
39+
return buffer
40+
}()
41+
42+
try ByteToMessageDecoderVerifier.verifyDecoder(
43+
inputOutputPairs: [
44+
(firstMessage, [Array(repeating: UInt8(42), count: 16)]),
45+
(secondMessage, [Array(repeating: UInt8(43), count: 8)]),
46+
]) {
47+
GRPCMessageDeframer()
48+
}
49+
}
50+
51+
func testReadMessageOverSizeLimitWithoutCompression() throws {
52+
let deframer = GRPCMessageDeframer(maximumPayloadSize: 100)
53+
let processor = NIOSingleStepByteToMessageProcessor(deframer)
54+
55+
var buffer = ByteBuffer()
56+
buffer.writeInteger(UInt8(0))
57+
buffer.writeInteger(UInt32(101))
58+
buffer.writeRepeatingByte(42, count: 101)
59+
60+
XCTAssertThrowsError(
61+
ofType: RPCError.self,
62+
try processor.process(buffer: buffer) { _ in
63+
XCTFail("No message should be produced.")
64+
}
65+
) { error in
66+
XCTAssertEqual(error.code, .resourceExhausted)
67+
XCTAssertEqual(
68+
error.message,
69+
"Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
70+
)
71+
}
72+
}
73+
74+
func testReadMessageOverSizeLimitButWithoutActualMessageBytes() throws {
75+
let deframer = GRPCMessageDeframer(maximumPayloadSize: 100)
76+
let processor = NIOSingleStepByteToMessageProcessor(deframer)
77+
78+
var buffer = ByteBuffer()
79+
buffer.writeInteger(UInt8(0))
80+
// Set the message length field to be over the maximum payload size, but
81+
// don't write the actual message bytes. This is to ensure that the payload
82+
// size limit is enforced _before_ the payload is actually read.
83+
buffer.writeInteger(UInt32(101))
84+
85+
XCTAssertThrowsError(
86+
ofType: RPCError.self,
87+
try processor.process(buffer: buffer) { _ in
88+
XCTFail("No message should be produced.")
89+
}
90+
) { error in
91+
XCTAssertEqual(error.code, .resourceExhausted)
92+
XCTAssertEqual(
93+
error.message,
94+
"Message has exceeded the configured maximum payload size (max: 100, actual: 101)"
95+
)
96+
}
97+
}
98+
99+
func testCompressedMessageWithoutConfiguringDecompressor() throws {
100+
let deframer = GRPCMessageDeframer(maximumPayloadSize: 100)
101+
let processor = NIOSingleStepByteToMessageProcessor(deframer)
102+
103+
var buffer = ByteBuffer()
104+
buffer.writeInteger(UInt8(1))
105+
buffer.writeInteger(UInt32(10))
106+
buffer.writeRepeatingByte(42, count: 10)
107+
108+
XCTAssertThrowsError(
109+
ofType: RPCError.self,
110+
try processor.process(buffer: buffer) { _ in
111+
XCTFail("No message should be produced.")
112+
}
113+
) { error in
114+
XCTAssertEqual(error.code, .internalError)
115+
XCTAssertEqual(
116+
error.message,
117+
"Received a compressed message payload, but no decompressor has been configured."
118+
)
119+
}
120+
}
121+
122+
private func testReadMultipleMessagesWithCompression(method: Zlib.Method) throws {
123+
let decompressor = Zlib.Decompressor(method: method)
124+
let compressor = Zlib.Compressor(method: method)
125+
var framer = GRPCMessageFramer()
126+
defer {
127+
decompressor.end()
128+
compressor.end()
129+
}
130+
131+
let firstMessage = try {
132+
framer.append(Array(repeating: 42, count: 100))
133+
return try framer.next(compressor: compressor)!
134+
}()
135+
136+
let secondMessage = try {
137+
framer.append(Array(repeating: 43, count: 110))
138+
return try framer.next(compressor: compressor)!
139+
}()
140+
141+
try ByteToMessageDecoderVerifier.verifyDecoder(
142+
inputOutputPairs: [
143+
(firstMessage, [Array(repeating: 42, count: 100)]),
144+
(secondMessage, [Array(repeating: 43, count: 110)]),
145+
]) {
146+
GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor)
147+
}
148+
}
149+
150+
func testReadMultipleMessagesWithDeflateCompression() throws {
151+
try self.testReadMultipleMessagesWithCompression(method: .deflate)
152+
}
153+
154+
func testReadMultipleMessagesWithGZIPCompression() throws {
155+
try self.testReadMultipleMessagesWithCompression(method: .gzip)
156+
}
157+
158+
func testReadCompressedMessageOverSizeLimitBeforeDecompressing() throws {
159+
let deframer = GRPCMessageDeframer(maximumPayloadSize: 1)
160+
let processor = NIOSingleStepByteToMessageProcessor(deframer)
161+
let compressor = Zlib.Compressor(method: .gzip)
162+
var framer = GRPCMessageFramer()
163+
defer {
164+
compressor.end()
165+
}
166+
167+
framer.append(Array(repeating: 42, count: 100))
168+
let framedMessage = try framer.next(compressor: compressor)!
169+
170+
XCTAssertThrowsError(
171+
ofType: RPCError.self,
172+
try processor.process(buffer: framedMessage) { _ in
173+
XCTFail("No message should be produced.")
174+
}
175+
) { error in
176+
XCTAssertEqual(error.code, .resourceExhausted)
177+
XCTAssertEqual(
178+
error.message,
179+
"""
180+
Message has exceeded the configured maximum payload size \
181+
(max: 1, actual: \(framedMessage.readableBytes - GRPCMessageDeframer.metadataLength))
182+
"""
183+
)
184+
}
185+
}
186+
187+
private func testReadDecompressedMessageOverSizeLimit(method: Zlib.Method) throws {
188+
let decompressor = Zlib.Decompressor(method: method)
189+
let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor)
190+
let processor = NIOSingleStepByteToMessageProcessor(deframer)
191+
let compressor = Zlib.Compressor(method: method)
192+
var framer = GRPCMessageFramer()
193+
defer {
194+
decompressor.end()
195+
compressor.end()
196+
}
197+
198+
framer.append(Array(repeating: 42, count: 101))
199+
let framedMessage = try framer.next(compressor: compressor)!
200+
201+
XCTAssertThrowsError(
202+
ofType: RPCError.self,
203+
try processor.process(buffer: framedMessage) { _ in
204+
XCTFail("No message should be produced.")
205+
}
206+
) { error in
207+
XCTAssertEqual(error.code, .resourceExhausted)
208+
XCTAssertEqual(error.message, "Message is too large to decompress.")
209+
}
210+
}
211+
212+
func testReadDecompressedMessageOverSizeLimitWithDeflateCompression() throws {
213+
try self.testReadDecompressedMessageOverSizeLimit(method: .deflate)
214+
}
215+
216+
func testReadDecompressedMessageOverSizeLimitWithGZIPCompression() throws {
217+
try self.testReadDecompressedMessageOverSizeLimit(method: .gzip)
218+
}
219+
}

0 commit comments

Comments
 (0)