Skip to content

Commit 19d24ab

Browse files
authored
Add GRPCMessageFramer (#1768)
1 parent 3d74772 commit 19d24ab

File tree

5 files changed

+473
-0
lines changed

5 files changed

+473
-0
lines changed

Package.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ extension Target {
203203
.nioCore,
204204
.nioHTTP2,
205205
.cgrpcZlib,
206+
.dequeModule
206207
]
207208
)
208209

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import NIOCore
18+
19+
/// A ``GRPCMessageFramer`` helps with the framing of gRPC data frames:
20+
/// - It prepends data with the required metadata (compression flag and message length).
21+
/// - It compresses messages using the specified compression algorithm (if configured).
22+
/// - It coalesces multiple messages (appended into the `Framer` by calling ``append(_:compress:)``)
23+
/// into a single `ByteBuffer`.
24+
struct GRPCMessageFramer {
25+
/// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
26+
static let metadataLength = 5
27+
28+
/// Maximum size the `writeBuffer` can be when concatenating multiple frames.
29+
/// This limit will not be considered if only a single message/frame is written into the buffer, meaning
30+
/// frames with messages over 64KB can still be written.
31+
/// - Note: This is expressed as the power of 2 closer to 64KB (i.e., 64KiB) because `ByteBuffer`
32+
/// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
33+
static let maximumWriteBufferLength = 65_536
34+
35+
private var pendingMessages: OneOrManyQueue<PendingMessage>
36+
37+
private struct PendingMessage {
38+
let bytes: [UInt8]
39+
let compress: Bool
40+
}
41+
42+
private var writeBuffer: ByteBuffer
43+
44+
init() {
45+
self.pendingMessages = OneOrManyQueue()
46+
self.writeBuffer = ByteBuffer()
47+
}
48+
49+
/// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
50+
/// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
51+
/// If `compress` is true, then the given bytes will be compressed using the configured compression algorithm.
52+
mutating func append(_ bytes: [UInt8], compress: Bool) {
53+
self.pendingMessages.append(PendingMessage(bytes: bytes, compress: compress))
54+
}
55+
56+
/// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
57+
/// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`.
58+
/// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
59+
mutating func next() throws -> ByteBuffer? {
60+
if self.pendingMessages.isEmpty {
61+
// Nothing pending: exit early.
62+
return nil
63+
}
64+
65+
defer {
66+
// To avoid holding an excessively large buffer, if its size is larger than
67+
// our threshold (`maximumWriteBufferLength`), then reset it to a new `ByteBuffer`.
68+
if self.writeBuffer.capacity > Self.maximumWriteBufferLength {
69+
self.writeBuffer = ByteBuffer()
70+
}
71+
}
72+
73+
var requiredCapacity = 0
74+
for message in self.pendingMessages {
75+
requiredCapacity += message.bytes.count + Self.metadataLength
76+
}
77+
self.writeBuffer.clear(minimumCapacity: requiredCapacity)
78+
79+
while let message = self.pendingMessages.pop() {
80+
try self.encode(message)
81+
}
82+
83+
return self.writeBuffer
84+
}
85+
86+
private mutating func encode(_ message: PendingMessage) throws {
87+
if message.compress {
88+
self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag
89+
// TODO: compress message and write the compressed message length + bytes
90+
} else {
91+
self.writeBuffer.writeMultipleIntegers(
92+
UInt8(0), // Clear compression flag
93+
UInt32(message.bytes.count) // Set message length
94+
)
95+
self.writeBuffer.writeBytes(message.bytes)
96+
}
97+
}
98+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import DequeModule
18+
19+
/// A FIFO-queue which allows for a single element to be stored on the stack and defers to a
20+
/// heap-implementation if further elements are added.
21+
///
22+
/// This is useful when optimising for unary streams where avoiding the cost of a heap
23+
/// allocation is desirable.
24+
internal struct OneOrManyQueue<Element>: Collection {
25+
private var backing: Backing
26+
27+
private enum Backing: Collection {
28+
case none
29+
case one(Element)
30+
case many(Deque<Element>)
31+
32+
var startIndex: Int {
33+
switch self {
34+
case .none, .one:
35+
return 0
36+
case let .many(elements):
37+
return elements.startIndex
38+
}
39+
}
40+
41+
var endIndex: Int {
42+
switch self {
43+
case .none:
44+
return 0
45+
case .one:
46+
return 1
47+
case let .many(elements):
48+
return elements.endIndex
49+
}
50+
}
51+
52+
subscript(index: Int) -> Element {
53+
switch self {
54+
case .none:
55+
fatalError("Invalid index")
56+
case let .one(element):
57+
assert(index == 0)
58+
return element
59+
case let .many(elements):
60+
return elements[index]
61+
}
62+
}
63+
64+
func index(after index: Int) -> Int {
65+
switch self {
66+
case .none:
67+
return 0
68+
case .one:
69+
return 1
70+
case let .many(elements):
71+
return elements.index(after: index)
72+
}
73+
}
74+
75+
var count: Int {
76+
switch self {
77+
case .none:
78+
return 0
79+
case .one:
80+
return 1
81+
case let .many(elements):
82+
return elements.count
83+
}
84+
}
85+
86+
var isEmpty: Bool {
87+
switch self {
88+
case .none:
89+
return true
90+
case .one:
91+
return false
92+
case let .many(elements):
93+
return elements.isEmpty
94+
}
95+
}
96+
97+
mutating func append(_ element: Element) {
98+
switch self {
99+
case .none:
100+
self = .one(element)
101+
case let .one(one):
102+
var elements = Deque<Element>()
103+
elements.reserveCapacity(16)
104+
elements.append(one)
105+
elements.append(element)
106+
self = .many(elements)
107+
case var .many(elements):
108+
self = .none
109+
elements.append(element)
110+
self = .many(elements)
111+
}
112+
}
113+
114+
mutating func pop() -> Element? {
115+
switch self {
116+
case .none:
117+
return nil
118+
case let .one(element):
119+
self = .none
120+
return element
121+
case var .many(many):
122+
self = .none
123+
let element = many.popFirst()
124+
self = .many(many)
125+
return element
126+
}
127+
}
128+
}
129+
130+
init() {
131+
self.backing = .none
132+
}
133+
134+
var isEmpty: Bool {
135+
return self.backing.isEmpty
136+
}
137+
138+
var count: Int {
139+
return self.backing.count
140+
}
141+
142+
var startIndex: Int {
143+
return self.backing.startIndex
144+
}
145+
146+
var endIndex: Int {
147+
return self.backing.endIndex
148+
}
149+
150+
subscript(index: Int) -> Element {
151+
return self.backing[index]
152+
}
153+
154+
func index(after index: Int) -> Int {
155+
return self.backing.index(after: index)
156+
}
157+
158+
mutating func append(_ element: Element) {
159+
self.backing.append(element)
160+
}
161+
162+
mutating func pop() -> Element? {
163+
return self.backing.pop()
164+
}
165+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import NIOCore
18+
import XCTest
19+
20+
@testable import GRPCHTTP2Core
21+
22+
final class GRPCMessageFramerTests: XCTestCase {
23+
func testSingleWrite() throws {
24+
var framer = GRPCMessageFramer()
25+
framer.append(Array(repeating: 42, count: 128), compress: false)
26+
27+
var buffer = try XCTUnwrap(framer.next())
28+
let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
29+
XCTAssertFalse(compressed)
30+
XCTAssertEqual(length, 128)
31+
XCTAssertEqual(buffer.readSlice(length: Int(length)), ByteBuffer(repeating: 42, count: 128))
32+
XCTAssertEqual(buffer.readableBytes, 0)
33+
34+
// No more bufers.
35+
XCTAssertNil(try framer.next())
36+
}
37+
38+
func testMultipleWrites() throws {
39+
var framer = GRPCMessageFramer()
40+
41+
let messages = 100
42+
for _ in 0 ..< messages {
43+
framer.append(Array(repeating: 42, count: 128), compress: false)
44+
}
45+
46+
var buffer = try XCTUnwrap(framer.next())
47+
for _ in 0 ..< messages {
48+
let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
49+
XCTAssertFalse(compressed)
50+
XCTAssertEqual(length, 128)
51+
XCTAssertEqual(buffer.readSlice(length: Int(length)), ByteBuffer(repeating: 42, count: 128))
52+
}
53+
54+
XCTAssertEqual(buffer.readableBytes, 0)
55+
56+
// No more bufers.
57+
XCTAssertNil(try framer.next())
58+
}
59+
}
60+
61+
extension ByteBuffer {
62+
mutating func readMessageHeader() -> (Bool, UInt32)? {
63+
if let (compressed, length) = self.readMultipleIntegers(as: (UInt8, UInt32).self) {
64+
return (compressed != 0, length)
65+
} else {
66+
return nil
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)