Skip to content

Commit a260bb3

Browse files
authored
Add compression logic to GRPCMessageFramer (#1771)
1 parent 19d24ab commit a260bb3

File tree

3 files changed

+67
-19
lines changed

3 files changed

+67
-19
lines changed

Sources/GRPCHTTP2Core/Compression/Zlib.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ extension Zlib {
3838
/// Creates a new compressor for the given compression format.
3939
///
4040
/// This compressor is only suitable for compressing whole messages at a time.
41+
///
42+
/// - Important: ``Compressor/end()`` must be called when the compressor is not needed
43+
/// anymore, to deallocate any resources allocated by `Zlib`.
4144
struct Compressor {
4245
// TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version.
4346

@@ -86,6 +89,9 @@ extension Zlib {
8689
/// Creates a new decompressor for the given compression format.
8790
///
8891
/// This decompressor is only suitable for compressing whole messages at a time.
92+
///
93+
/// - Important: ``Decompressor/end()`` must be called when the compressor is not needed
94+
/// anymore, to deallocate any resources allocated by `Zlib`.
8995
struct Decompressor {
9096
// TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version.
9197

Sources/GRPCHTTP2Core/GRPCMessageFramer.swift

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,31 +32,28 @@ struct GRPCMessageFramer {
3232
/// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
3333
static let maximumWriteBufferLength = 65_536
3434

35-
private var pendingMessages: OneOrManyQueue<PendingMessage>
36-
37-
private struct PendingMessage {
38-
let bytes: [UInt8]
39-
let compress: Bool
40-
}
35+
private var pendingMessages: OneOrManyQueue<[UInt8]>
4136

4237
private var writeBuffer: ByteBuffer
4338

39+
/// Create a new ``GRPCMessageFramer``.
4440
init() {
4541
self.pendingMessages = OneOrManyQueue()
4642
self.writeBuffer = ByteBuffer()
4743
}
4844

4945
/// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
5046
/// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
51-
/// If `compress` is true, then the given bytes will be compressed using the configured compression algorithm.
52-
mutating func append(_ bytes: [UInt8], compress: Bool) {
53-
self.pendingMessages.append(PendingMessage(bytes: bytes, compress: compress))
47+
mutating func append(_ bytes: [UInt8]) {
48+
self.pendingMessages.append(bytes)
5449
}
5550

5651
/// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
5752
/// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`.
53+
/// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise
54+
/// they'll be framed as-is.
5855
/// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
59-
mutating func next() throws -> ByteBuffer? {
56+
mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? {
6057
if self.pendingMessages.isEmpty {
6158
// Nothing pending: exit early.
6259
return nil
@@ -72,27 +69,34 @@ struct GRPCMessageFramer {
7269

7370
var requiredCapacity = 0
7471
for message in self.pendingMessages {
75-
requiredCapacity += message.bytes.count + Self.metadataLength
72+
requiredCapacity += message.count + Self.metadataLength
7673
}
7774
self.writeBuffer.clear(minimumCapacity: requiredCapacity)
7875

7976
while let message = self.pendingMessages.pop() {
80-
try self.encode(message)
77+
try self.encode(message, compressor: compressor)
8178
}
8279

8380
return self.writeBuffer
8481
}
8582

86-
private mutating func encode(_ message: PendingMessage) throws {
87-
if message.compress {
83+
private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws {
84+
if let compressor {
8885
self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag
89-
// TODO: compress message and write the compressed message length + bytes
86+
87+
// Write zeroes as length - we'll write the actual compressed size after compression.
88+
let lengthIndex = self.writeBuffer.writerIndex
89+
self.writeBuffer.writeInteger(UInt32(0))
90+
91+
// Compress and overwrite the payload length field with the right length.
92+
let writtenBytes = try compressor.compress(message, into: &self.writeBuffer)
93+
self.writeBuffer.setInteger(UInt32(writtenBytes), at: lengthIndex)
9094
} else {
9195
self.writeBuffer.writeMultipleIntegers(
9296
UInt8(0), // Clear compression flag
93-
UInt32(message.bytes.count) // Set message length
97+
UInt32(message.count) // Set message length
9498
)
95-
self.writeBuffer.writeBytes(message.bytes)
99+
self.writeBuffer.writeBytes(message)
96100
}
97101
}
98102
}

Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import XCTest
2222
final class GRPCMessageFramerTests: XCTestCase {
2323
func testSingleWrite() throws {
2424
var framer = GRPCMessageFramer()
25-
framer.append(Array(repeating: 42, count: 128), compress: false)
25+
framer.append(Array(repeating: 42, count: 128))
2626

2727
var buffer = try XCTUnwrap(framer.next())
2828
let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
@@ -35,12 +35,49 @@ final class GRPCMessageFramerTests: XCTestCase {
3535
XCTAssertNil(try framer.next())
3636
}
3737

38+
private func testSingleWrite(compressionMethod: Zlib.Method) throws {
39+
let compressor = Zlib.Compressor(method: compressionMethod)
40+
defer {
41+
compressor.end()
42+
}
43+
var framer = GRPCMessageFramer()
44+
45+
let message = [UInt8](repeating: 42, count: 128)
46+
framer.append(message)
47+
48+
var buffer = ByteBuffer()
49+
let testCompressor = Zlib.Compressor(method: compressionMethod)
50+
let compressedSize = try testCompressor.compress(message, into: &buffer)
51+
let compressedMessage = buffer.readSlice(length: compressedSize)
52+
defer {
53+
testCompressor.end()
54+
}
55+
56+
buffer = try XCTUnwrap(framer.next(compressor: compressor))
57+
let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
58+
XCTAssertTrue(compressed)
59+
XCTAssertEqual(length, UInt32(compressedSize))
60+
XCTAssertEqual(buffer.readSlice(length: Int(length)), compressedMessage)
61+
XCTAssertEqual(buffer.readableBytes, 0)
62+
63+
// No more bufers.
64+
XCTAssertNil(try framer.next())
65+
}
66+
67+
func testSingleWriteDeflateCompressed() throws {
68+
try self.testSingleWrite(compressionMethod: .deflate)
69+
}
70+
71+
func testSingleWriteGZIPCompressed() throws {
72+
try self.testSingleWrite(compressionMethod: .gzip)
73+
}
74+
3875
func testMultipleWrites() throws {
3976
var framer = GRPCMessageFramer()
4077

4178
let messages = 100
4279
for _ in 0 ..< messages {
43-
framer.append(Array(repeating: 42, count: 128), compress: false)
80+
framer.append(Array(repeating: 42, count: 128))
4481
}
4582

4683
var buffer = try XCTUnwrap(framer.next())
@@ -56,6 +93,7 @@ final class GRPCMessageFramerTests: XCTestCase {
5693
// No more bufers.
5794
XCTAssertNil(try framer.next())
5895
}
96+
5997
}
6098

6199
extension ByteBuffer {

0 commit comments

Comments
 (0)