Skip to content

Commit 29bc8bc

Browse files
authored
Fix several bugs in GRPC stream handlers and state machine (#1844)
1 parent 97994e1 commit 29bc8bc

File tree

5 files changed

+321
-73
lines changed

5 files changed

+321
-73
lines changed

Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,25 @@ extension GRPCClientStreamHandler {
6767
switch frameData.data {
6868
case .byteBuffer(let buffer):
6969
do {
70-
try self.stateMachine.receive(buffer: buffer, endStream: endStream)
71-
loop: while true {
72-
switch self.stateMachine.nextInboundMessage() {
73-
case .receiveMessage(let message):
74-
context.fireChannelRead(self.wrapInboundOut(.message(message)))
75-
case .awaitMoreMessages:
76-
break loop
77-
case .noMoreMessages:
78-
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
79-
break loop
70+
switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) {
71+
case .endRPCAndForwardErrorStatus(let status):
72+
context.fireChannelRead(self.wrapInboundOut(.status(status, [:])))
73+
context.close(promise: nil)
74+
75+
case .readInbound:
76+
loop: while true {
77+
switch self.stateMachine.nextInboundMessage() {
78+
case .receiveMessage(let message):
79+
context.fireChannelRead(self.wrapInboundOut(.message(message)))
80+
case .awaitMoreMessages:
81+
break loop
82+
case .noMoreMessages:
83+
// This could only happen if the server sends a data frame with EOS
84+
// set, without sending status and trailers.
85+
// If this happens, we should have forwarded an error status above
86+
// so we should never reach this point. Do nothing.
87+
break loop
88+
}
8089
}
8190
}
8291
} catch {
@@ -105,6 +114,7 @@ extension GRPCClientStreamHandler {
105114

106115
case .receivedStatusAndMetadata(let status, let metadata):
107116
context.fireChannelRead(self.wrapInboundOut(.status(status, metadata)))
117+
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
108118

109119
case .doNothing:
110120
()
@@ -161,7 +171,29 @@ extension GRPCClientStreamHandler {
161171

162172
func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
163173
switch mode {
164-
case .output, .all:
174+
case .input:
175+
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
176+
promise?.succeed()
177+
178+
case .output:
179+
// We flush all pending messages and update the internal state machine's
180+
// state, but we don't close the outbound end of the channel, because
181+
// forwarding the close in this case would cause the HTTP2 stream handler
182+
// to close the whole channel (as the mode is ignored in its implementation).
183+
do {
184+
try self.stateMachine.closeOutbound()
185+
// Force a flush by calling _flush instead of flush
186+
// (otherwise, we'd skip flushing if we're in a read loop)
187+
self._flush(context: context)
188+
promise?.succeed()
189+
} catch {
190+
promise?.fail(error)
191+
context.fireErrorCaught(error)
192+
}
193+
194+
case .all:
195+
// Since we're closing the whole channel here, we *do* forward the close
196+
// down the pipeline.
165197
do {
166198
try self.stateMachine.closeOutbound()
167199
// Force a flush by calling _flush
@@ -172,9 +204,6 @@ extension GRPCClientStreamHandler {
172204
promise?.fail(error)
173205
context.fireErrorCaught(error)
174206
}
175-
176-
case .input:
177-
context.close(mode: .input, promise: promise)
178207
}
179208
}
180209

Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,12 @@ private enum GRPCStreamStateMachineState {
292292
self.inboundMessageBuffer = previousState.inboundMessageBuffer
293293
}
294294

295+
init(previousState: ClientOpenServerOpenState) {
296+
self.framer = previousState.framer
297+
self.compressor = previousState.compressor
298+
self.inboundMessageBuffer = previousState.inboundMessageBuffer
299+
}
300+
295301
init(previousState: ClientOpenServerClosedState) {
296302
self.framer = previousState.framer
297303
self.compressor = previousState.compressor
@@ -388,12 +394,24 @@ struct GRPCStreamStateMachine {
388394
}
389395
}
390396

391-
mutating func receive(buffer: ByteBuffer, endStream: Bool) throws {
397+
enum OnBufferReceivedAction: Equatable {
398+
case readInbound
399+
400+
// Client-specific actions
401+
402+
// This will be returned when the server sends a data frame with EOS set.
403+
// This is invalid as per the protocol specification, because the server
404+
// can only close by sending trailers, not by setting EOS when sending
405+
// a message.
406+
case endRPCAndForwardErrorStatus(Status)
407+
}
408+
409+
mutating func receive(buffer: ByteBuffer, endStream: Bool) throws -> OnBufferReceivedAction {
392410
switch self.configuration {
393411
case .client:
394-
try self.clientReceive(buffer: buffer, endStream: endStream)
412+
return try self.clientReceive(buffer: buffer, endStream: endStream)
395413
case .server:
396-
try self.serverReceive(buffer: buffer, endStream: endStream)
414+
return try self.serverReceive(buffer: buffer, endStream: endStream)
397415
}
398416
}
399417

@@ -729,7 +747,7 @@ extension GRPCStreamStateMachine {
729747
}
730748

731749
let statusMessage =
732-
metadata.firstString(forKey: .grpcStatusMessage)
750+
metadata.firstString(forKey: .grpcStatusMessage, canonicalForm: false)
733751
.map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""
734752

735753
var convertedMetadata = Metadata(headers: metadata)
@@ -860,38 +878,68 @@ extension GRPCStreamStateMachine {
860878
}
861879
}
862880

863-
private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws {
881+
private mutating func clientReceive(
882+
buffer: ByteBuffer,
883+
endStream: Bool
884+
) throws -> OnBufferReceivedAction {
864885
// This is a message received by the client, from the server.
865886
switch self.state {
866887
case .clientIdleServerIdle:
867888
try self.invalidState(
868889
"Cannot have received anything from server if client is not yet open."
869890
)
891+
870892
case .clientOpenServerIdle, .clientClosedServerIdle:
871893
try self.invalidState(
872894
"Server cannot have sent a message before sending the initial metadata."
873895
)
896+
874897
case .clientOpenServerOpen(var state):
898+
if endStream {
899+
// This is invalid as per the protocol specification, because the server
900+
// can only close by sending trailers, not by setting EOS when sending
901+
// a message.
902+
self.state = .clientClosedServerClosed(.init(previousState: state))
903+
return .endRPCAndForwardErrorStatus(
904+
Status(
905+
code: .internalError,
906+
message: """
907+
Server sent EOS alongside a data frame, but server is only allowed \
908+
to close by sending status and trailers.
909+
"""
910+
)
911+
)
912+
}
913+
875914
try state.deframer.process(buffer: buffer) { deframedMessage in
876915
state.inboundMessageBuffer.append(deframedMessage)
877916
}
917+
self.state = .clientOpenServerOpen(state)
918+
return .readInbound
919+
920+
case .clientClosedServerOpen(var state):
878921
if endStream {
879-
self.state = .clientOpenServerClosed(.init(previousState: state))
880-
} else {
881-
self.state = .clientOpenServerOpen(state)
922+
self.state = .clientClosedServerClosed(.init(previousState: state))
923+
return .endRPCAndForwardErrorStatus(
924+
Status(
925+
code: .internalError,
926+
message: """
927+
Server sent EOS alongside a data frame, but server is only allowed \
928+
to close by sending status and trailers.
929+
"""
930+
)
931+
)
882932
}
883-
case .clientClosedServerOpen(var state):
933+
884934
// The client may have sent the end stream and thus it's closed,
885935
// but the server may still be responding.
886936
// The client must have a deframer set up, so force-unwrap is okay.
887937
try state.deframer!.process(buffer: buffer) { deframedMessage in
888938
state.inboundMessageBuffer.append(deframedMessage)
889939
}
890-
if endStream {
891-
self.state = .clientClosedServerClosed(.init(previousState: state))
892-
} else {
893-
self.state = .clientClosedServerOpen(state)
894-
}
940+
self.state = .clientClosedServerOpen(state)
941+
return .readInbound
942+
895943
case .clientOpenServerClosed, .clientClosedServerClosed:
896944
try self.invalidState(
897945
"Cannot have received anything from a closed server."
@@ -1314,7 +1362,10 @@ extension GRPCStreamStateMachine {
13141362
}
13151363
}
13161364

1317-
private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws {
1365+
private mutating func serverReceive(
1366+
buffer: ByteBuffer,
1367+
endStream: Bool
1368+
) throws -> OnBufferReceivedAction {
13181369
switch self.state {
13191370
case .clientIdleServerIdle:
13201371
try self.invalidState(
@@ -1354,6 +1405,7 @@ extension GRPCStreamStateMachine {
13541405
"Client can't send a message if closed."
13551406
)
13561407
}
1408+
return .readInbound
13571409
}
13581410

13591411
private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
@@ -1443,10 +1495,11 @@ internal enum GRPCHTTP2Keys: String {
14431495
}
14441496

14451497
extension HPACKHeaders {
1446-
internal func firstString(forKey key: GRPCHTTP2Keys) -> String? {
1447-
self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map {
1448-
String($0)
1449-
}
1498+
internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? {
1499+
self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true })
1500+
.map {
1501+
String($0)
1502+
}
14501503
}
14511504

14521505
internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {

Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,22 @@ extension GRPCServerStreamHandler {
6464
switch frameData.data {
6565
case .byteBuffer(let buffer):
6666
do {
67-
try self.stateMachine.receive(buffer: buffer, endStream: endStream)
68-
loop: while true {
69-
switch self.stateMachine.nextInboundMessage() {
70-
case .receiveMessage(let message):
71-
context.fireChannelRead(self.wrapInboundOut(.message(message)))
72-
case .awaitMoreMessages:
73-
break loop
74-
case .noMoreMessages:
75-
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
76-
break loop
67+
switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) {
68+
case .endRPCAndForwardErrorStatus:
69+
preconditionFailure(
70+
"OnBufferReceivedAction.endRPCAndForwardErrorStatus should never be returned for the server."
71+
)
72+
case .readInbound:
73+
loop: while true {
74+
switch self.stateMachine.nextInboundMessage() {
75+
case .receiveMessage(let message):
76+
context.fireChannelRead(self.wrapInboundOut(.message(message)))
77+
case .awaitMoreMessages:
78+
break loop
79+
case .noMoreMessages:
80+
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
81+
break loop
82+
}
7783
}
7884
}
7985
} catch {

Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,10 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
305305
buffer.writeInteger(UInt8(0)) // not compressed
306306
buffer.writeInteger(UInt32(42)) // message length
307307
buffer.writeRepeatingByte(0, count: 42) // message
308-
let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
308+
let clientDataPayload = HTTP2Frame.FramePayload.Data(
309+
data: .byteBuffer(buffer),
310+
endStream: false
311+
)
309312
XCTAssertThrowsError(
310313
ofType: RPCError.self,
311314
try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
@@ -321,6 +324,74 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
321324
XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
322325
}
323326

327+
func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
328+
let handler = GRPCClientStreamHandler(
329+
methodDescriptor: .init(service: "test", method: "test"),
330+
scheme: .http,
331+
outboundEncoding: .identity,
332+
acceptedEncodings: [],
333+
maximumPayloadSize: 100,
334+
skipStateMachineAssertions: true
335+
)
336+
337+
let channel = EmbeddedChannel(handler: handler)
338+
339+
// Send client's initial metadata
340+
XCTAssertNoThrow(
341+
try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
342+
)
343+
344+
// Make sure we have sent right metadata.
345+
let writtenMetadata = try channel.assertReadHeadersOutbound()
346+
347+
XCTAssertEqual(
348+
writtenMetadata.headers,
349+
[
350+
GRPCHTTP2Keys.method.rawValue: "POST",
351+
GRPCHTTP2Keys.scheme.rawValue: "http",
352+
GRPCHTTP2Keys.path.rawValue: "test/test",
353+
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
354+
GRPCHTTP2Keys.te.rawValue: "trailers",
355+
]
356+
)
357+
358+
// Server sends initial metadata
359+
let serverInitialMetadata: HPACKHeaders = [
360+
GRPCHTTP2Keys.status.rawValue: "200",
361+
GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
362+
]
363+
XCTAssertNoThrow(
364+
try channel.writeInbound(
365+
HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
366+
)
367+
)
368+
XCTAssertEqual(
369+
try channel.readInbound(as: RPCResponsePart.self),
370+
.metadata(Metadata(headers: serverInitialMetadata))
371+
)
372+
373+
// Server sends message with EOS set.
374+
var buffer = ByteBuffer()
375+
buffer.writeInteger(UInt8(0)) // not compressed
376+
buffer.writeInteger(UInt32(42)) // message length
377+
buffer.writeRepeatingByte(0, count: 42) // message
378+
let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
379+
XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
380+
381+
// Make sure we got status + trailers with the right error.
382+
XCTAssertEqual(
383+
try channel.readInbound(as: RPCResponsePart.self),
384+
.status(
385+
Status(
386+
code: .internalError,
387+
message:
388+
"Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
389+
),
390+
[:]
391+
)
392+
)
393+
}
394+
324395
func testServerEndsStream() throws {
325396
let handler = GRPCClientStreamHandler(
326397
methodDescriptor: .init(service: "test", method: "test"),

0 commit comments

Comments
 (0)