Skip to content

Commit 97994e1

Browse files
authored
Handle write promises correctly (#1843)
1 parent 847a934 commit 97994e1

File tree

7 files changed

+272
-189
lines changed

7 files changed

+272
-189
lines changed

Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,24 +143,18 @@ extension GRPCClientStreamHandler {
143143
do {
144144
self.flushPending = true
145145
let headers = try self.stateMachine.send(metadata: metadata)
146-
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil)
147-
// TODO: move the promise handling into the state machine
148-
promise?.succeed()
146+
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise)
149147
} catch {
150-
context.fireErrorCaught(error)
151-
// TODO: move the promise handling into the state machine
152148
promise?.fail(error)
149+
context.fireErrorCaught(error)
153150
}
154151

155152
case .message(let message):
156153
do {
157-
try self.stateMachine.send(message: message)
158-
// TODO: move the promise handling into the state machine
159-
promise?.succeed()
154+
try self.stateMachine.send(message: message, promise: promise)
160155
} catch {
161-
context.fireErrorCaught(error)
162-
// TODO: move the promise handling into the state machine
163156
promise?.fail(error)
157+
context.fireErrorCaught(error)
164158
}
165159
}
166160
}
@@ -197,12 +191,12 @@ extension GRPCClientStreamHandler {
197191
private func _flush(context: ChannelHandlerContext) {
198192
do {
199193
loop: while true {
200-
switch try self.stateMachine.nextOutboundMessage() {
201-
case .sendMessage(let byteBuffer):
194+
switch try self.stateMachine.nextOutboundFrame() {
195+
case .sendFrame(let byteBuffer, let promise):
202196
self.flushPending = true
203197
context.write(
204198
self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))),
205-
promise: nil
199+
promise: promise
206200
)
207201

208202
case .noMoreMessages:

Sources/GRPCHTTP2Core/GRPCMessageFramer.swift

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ struct GRPCMessageFramer {
3232
/// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
3333
static let maximumWriteBufferLength = 65_536
3434

35-
private var pendingMessages: OneOrManyQueue<[UInt8]>
35+
private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise<Void>?)>
3636

3737
private var writeBuffer: ByteBuffer
3838

@@ -44,16 +44,18 @@ struct GRPCMessageFramer {
4444

4545
/// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
4646
/// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
47-
mutating func append(_ bytes: [UInt8]) {
48-
self.pendingMessages.append(bytes)
47+
mutating func append(_ bytes: [UInt8], promise: EventLoopPromise<Void>?) {
48+
self.pendingMessages.append((bytes, promise))
4949
}
5050

5151
/// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
5252
/// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`.
5353
/// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise
5454
/// they'll be framed as-is.
5555
/// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
56-
mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? {
56+
mutating func next(
57+
compressor: Zlib.Compressor? = nil
58+
) throws -> (bytes: ByteBuffer, promise: EventLoopPromise<Void>?)? {
5759
if self.pendingMessages.isEmpty {
5860
// Nothing pending: exit early.
5961
return nil
@@ -69,15 +71,21 @@ struct GRPCMessageFramer {
6971

7072
var requiredCapacity = 0
7173
for message in self.pendingMessages {
72-
requiredCapacity += message.count + Self.metadataLength
74+
requiredCapacity += message.bytes.count + Self.metadataLength
7375
}
7476
self.writeBuffer.clear(minimumCapacity: requiredCapacity)
7577

78+
var pendingWritePromise: EventLoopPromise<Void>?
7679
while let message = self.pendingMessages.pop() {
77-
try self.encode(message, compressor: compressor)
80+
try self.encode(message.bytes, compressor: compressor)
81+
if let existingPendingWritePromise = pendingWritePromise {
82+
existingPendingWritePromise.futureResult.cascade(to: message.promise)
83+
} else {
84+
pendingWritePromise = message.promise
85+
}
7886
}
7987

80-
return self.writeBuffer
88+
return (bytes: self.writeBuffer, promise: pendingWritePromise)
8189
}
8290

8391
private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws {

Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -325,12 +325,12 @@ struct GRPCStreamStateMachine {
325325
}
326326
}
327327

328-
mutating func send(message: [UInt8]) throws {
328+
mutating func send(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
329329
switch self.configuration {
330330
case .client:
331-
try self.clientSend(message: message)
331+
try self.clientSend(message: message, promise: promise)
332332
case .server:
333-
try self.serverSend(message: message)
333+
try self.serverSend(message: message, promise: promise)
334334
}
335335
}
336336

@@ -397,23 +397,26 @@ struct GRPCStreamStateMachine {
397397
}
398398
}
399399

400-
/// The result of requesting the next outbound message.
401-
enum OnNextOutboundMessage: Equatable {
402-
/// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done
400+
/// The result of requesting the next outbound frame, which may contain multiple messages.
401+
enum OnNextOutboundFrame {
402+
/// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done
403403
/// writing messages (i.e. we are now closed).
404404
case noMoreMessages
405-
/// There isn't a message ready to be sent, but we could still receive more, so keep trying.
405+
/// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying.
406406
case awaitMoreMessages
407-
/// A message is ready to be sent.
408-
case sendMessage(ByteBuffer)
407+
/// A frame is ready to be sent.
408+
case sendFrame(
409+
frame: ByteBuffer,
410+
promise: EventLoopPromise<Void>?
411+
)
409412
}
410413

411-
mutating func nextOutboundMessage() throws -> OnNextOutboundMessage {
414+
mutating func nextOutboundFrame() throws -> OnNextOutboundFrame {
412415
switch self.configuration {
413416
case .client:
414-
return try self.clientNextOutboundMessage()
417+
return try self.clientNextOutboundFrame()
415418
case .server:
416-
return try self.serverNextOutboundMessage()
419+
return try self.serverNextOutboundFrame()
417420
}
418421
}
419422

@@ -540,15 +543,15 @@ extension GRPCStreamStateMachine {
540543
}
541544
}
542545

543-
private mutating func clientSend(message: [UInt8]) throws {
546+
private mutating func clientSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
544547
switch self.state {
545548
case .clientIdleServerIdle:
546549
try self.invalidState("Client not yet open.")
547550
case .clientOpenServerIdle(var state):
548-
state.framer.append(message)
551+
state.framer.append(message, promise: promise)
549552
self.state = .clientOpenServerIdle(state)
550553
case .clientOpenServerOpen(var state):
551-
state.framer.append(message)
554+
state.framer.append(message, promise: promise)
552555
self.state = .clientOpenServerOpen(state)
553556
case .clientOpenServerClosed:
554557
// The server has closed, so it makes no sense to send the rest of the request.
@@ -577,31 +580,33 @@ extension GRPCStreamStateMachine {
577580

578581
/// Returns the client's next request to the server.
579582
/// - Returns: The request to be made to the server.
580-
private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage {
583+
private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame {
581584
switch self.state {
582585
case .clientIdleServerIdle:
583586
try self.invalidState("Client is not open yet.")
584587
case .clientOpenServerIdle(var state):
585588
let request = try state.framer.next(compressor: state.compressor)
586589
self.state = .clientOpenServerIdle(state)
587-
return request.map { .sendMessage($0) } ?? .awaitMoreMessages
590+
return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
591+
?? .awaitMoreMessages
588592
case .clientOpenServerOpen(var state):
589593
let request = try state.framer.next(compressor: state.compressor)
590594
self.state = .clientOpenServerOpen(state)
591-
return request.map { .sendMessage($0) } ?? .awaitMoreMessages
595+
return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
596+
?? .awaitMoreMessages
592597
case .clientClosedServerIdle(var state):
593598
let request = try state.framer.next(compressor: state.compressor)
594599
self.state = .clientClosedServerIdle(state)
595600
if let request {
596-
return .sendMessage(request)
601+
return .sendFrame(frame: request.bytes, promise: request.promise)
597602
} else {
598603
return .noMoreMessages
599604
}
600605
case .clientClosedServerOpen(var state):
601606
let request = try state.framer.next(compressor: state.compressor)
602607
self.state = .clientClosedServerOpen(state)
603608
if let request {
604-
return .sendMessage(request)
609+
return .sendFrame(frame: request.bytes, promise: request.promise)
605610
} else {
606611
return .noMoreMessages
607612
}
@@ -1003,17 +1008,17 @@ extension GRPCStreamStateMachine {
10031008
}
10041009
}
10051010

1006-
private mutating func serverSend(message: [UInt8]) throws {
1011+
private mutating func serverSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
10071012
switch self.state {
10081013
case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
10091014
try self.invalidState(
10101015
"Server must have sent initial metadata before sending a message."
10111016
)
10121017
case .clientOpenServerOpen(var state):
1013-
state.framer.append(message)
1018+
state.framer.append(message, promise: promise)
10141019
self.state = .clientOpenServerOpen(state)
10151020
case .clientClosedServerOpen(var state):
1016-
state.framer.append(message)
1021+
state.framer.append(message, promise: promise)
10171022
self.state = .clientClosedServerOpen(state)
10181023
case .clientOpenServerClosed, .clientClosedServerClosed:
10191024
try self.invalidState(
@@ -1351,31 +1356,33 @@ extension GRPCStreamStateMachine {
13511356
}
13521357
}
13531358

1354-
private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage {
1359+
private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
13551360
switch self.state {
13561361
case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
13571362
try self.invalidState("Server is not open yet.")
13581363
case .clientOpenServerOpen(var state):
13591364
let response = try state.framer.next(compressor: state.compressor)
13601365
self.state = .clientOpenServerOpen(state)
1361-
return response.map { .sendMessage($0) } ?? .awaitMoreMessages
1366+
return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
1367+
?? .awaitMoreMessages
13621368
case .clientClosedServerOpen(var state):
13631369
let response = try state.framer.next(compressor: state.compressor)
13641370
self.state = .clientClosedServerOpen(state)
1365-
return response.map { .sendMessage($0) } ?? .awaitMoreMessages
1371+
return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
1372+
?? .awaitMoreMessages
13661373
case .clientOpenServerClosed(var state):
13671374
let response = try state.framer?.next(compressor: state.compressor)
13681375
self.state = .clientOpenServerClosed(state)
13691376
if let response {
1370-
return .sendMessage(response)
1377+
return .sendFrame(frame: response.bytes, promise: response.promise)
13711378
} else {
13721379
return .noMoreMessages
13731380
}
13741381
case .clientClosedServerClosed(var state):
13751382
let response = try state.framer?.next(compressor: state.compressor)
13761383
self.state = .clientClosedServerClosed(state)
13771384
if let response {
1378-
return .sendMessage(response)
1385+
return .sendFrame(frame: response.bytes, promise: response.promise)
13791386
} else {
13801387
return .noMoreMessages
13811388
}

Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler {
3434
// We buffer the final status + trailers to avoid reordering issues (i.e.,
3535
// if there are messages still not written into the channel because flush has
3636
// not been called, but the server sends back trailers).
37-
private var pendingTrailers: HTTP2Frame.FramePayload?
37+
private var pendingTrailers:
38+
(trailers: HTTP2Frame.FramePayload, promise: EventLoopPromise<Void>?)?
3839

3940
init(
4041
scheme: Scheme,
@@ -142,37 +143,28 @@ extension GRPCServerStreamHandler {
142143
do {
143144
self.flushPending = true
144145
let headers = try self.stateMachine.send(metadata: metadata)
145-
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil)
146-
// TODO: move the promise handling into the state machine
147-
promise?.succeed()
146+
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise)
148147
} catch {
149-
context.fireErrorCaught(error)
150-
// TODO: move the promise handling into the state machine
151148
promise?.fail(error)
149+
context.fireErrorCaught(error)
152150
}
153151

154152
case .message(let message):
155153
do {
156-
try self.stateMachine.send(message: message)
157-
// TODO: move the promise handling into the state machine
158-
promise?.succeed()
154+
try self.stateMachine.send(message: message, promise: promise)
159155
} catch {
160-
context.fireErrorCaught(error)
161-
// TODO: move the promise handling into the state machine
162156
promise?.fail(error)
157+
context.fireErrorCaught(error)
163158
}
164159

165160
case .status(let status, let metadata):
166161
do {
167162
let headers = try self.stateMachine.send(status: status, metadata: metadata)
168163
let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true))
169-
self.pendingTrailers = response
170-
// TODO: move the promise handling into the state machine
171-
promise?.succeed()
164+
self.pendingTrailers = (response, promise)
172165
} catch {
173-
context.fireErrorCaught(error)
174-
// TODO: move the promise handling into the state machine
175166
promise?.fail(error)
167+
context.fireErrorCaught(error)
176168
}
177169
}
178170
}
@@ -185,19 +177,22 @@ extension GRPCServerStreamHandler {
185177

186178
do {
187179
loop: while true {
188-
switch try self.stateMachine.nextOutboundMessage() {
189-
case .sendMessage(let byteBuffer):
180+
switch try self.stateMachine.nextOutboundFrame() {
181+
case .sendFrame(let byteBuffer, let promise):
190182
self.flushPending = true
191183
context.write(
192184
self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))),
193-
promise: nil
185+
promise: promise
194186
)
195187

196188
case .noMoreMessages:
197189
if let pendingTrailers = self.pendingTrailers {
198190
self.flushPending = true
199191
self.pendingTrailers = nil
200-
context.write(self.wrapOutboundOut(pendingTrailers), promise: nil)
192+
context.write(
193+
self.wrapOutboundOut(pendingTrailers.trailers),
194+
promise: pendingTrailers.promise
195+
)
201196
}
202197
break loop
203198

0 commit comments

Comments
 (0)