Skip to content

Commit 3c65b40

Browse files
authored
Add missing transitions to GRPCStreamStateMachine (#1831)
1 parent d292251 commit 3c65b40

File tree

2 files changed

+264
-13
lines changed

2 files changed

+264
-13
lines changed

Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

Lines changed: 93 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,29 @@ private enum GRPCStreamStateMachineState {
110110
}
111111

112112
struct ClientOpenServerClosedState {
113-
var framer: GRPCMessageFramer
113+
var framer: GRPCMessageFramer?
114114
var compressor: Zlib.Compressor?
115115

116116
let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
117117
var decompressor: Zlib.Decompressor?
118118

119119
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
120120

121+
// This transition should only happen on the server-side when, upon receiving
122+
// initial client metadata, some of the headers are invalid and we must reject
123+
// the RPC.
124+
// We will mark the client as open (because it sent initial metadata albeit
125+
// invalid) but we'll close the server, meaning all future messages sent from
126+
// the client will be ignored. Because of this, we won't need to frame or
127+
// deframe any messages, as we won't be reading or writing any messages.
128+
init(previousState: ClientIdleServerIdleState) {
129+
self.framer = nil
130+
self.compressor = nil
131+
self.deframer = nil
132+
self.decompressor = nil
133+
self.inboundMessageBuffer = .init()
134+
}
135+
121136
init(previousState: ClientOpenServerOpenState) {
122137
self.framer = previousState.framer
123138
self.compressor = previousState.compressor
@@ -240,12 +255,25 @@ private enum GRPCStreamStateMachineState {
240255
// We still need the framer and compressor in case the server has closed
241256
// but its buffer is not yet empty and still needs to send messages out to
242257
// the client.
243-
var framer: GRPCMessageFramer
258+
var framer: GRPCMessageFramer?
244259
var compressor: Zlib.Compressor?
245260

246261
// These are already deframed, so we don't need the deframer anymore.
247262
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
248263

264+
// This transition should only happen on the server-side when, upon receiving
265+
// initial client metadata, some of the headers are invalid and we must reject
266+
// the RPC.
267+
// We will mark the client as closed (because it set the EOS flag, even if
268+
// the initial metadata was invalid) and we'll close the server too.
269+
// Because of this, we won't need to frame any messages, as we
270+
// won't be writing any messages.
271+
init(previousState: ClientIdleServerIdleState) {
272+
self.framer = nil
273+
self.compressor = nil
274+
self.inboundMessageBuffer = .init()
275+
}
276+
249277
init(previousState: ClientClosedServerOpenState) {
250278
self.framer = previousState.framer
251279
self.compressor = previousState.compressor
@@ -1062,6 +1090,21 @@ extension GRPCStreamStateMachine {
10621090
}
10631091
}
10641092

1093+
mutating private func closeServerAndBuildRejectRPCAction(
1094+
currentState: GRPCStreamStateMachineState.ClientIdleServerIdleState,
1095+
endStream: Bool,
1096+
rejectWithStatus status: Status
1097+
) -> OnMetadataReceived {
1098+
if endStream {
1099+
self.state = .clientClosedServerClosed(.init(previousState: currentState))
1100+
} else {
1101+
self.state = .clientOpenServerClosed(.init(previousState: currentState))
1102+
}
1103+
1104+
let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
1105+
return .rejectRPC(trailers: trailers)
1106+
}
1107+
10651108
private mutating func serverReceive(
10661109
metadata: HPACKHeaders,
10671110
endStream: Bool,
@@ -1071,7 +1114,9 @@ extension GRPCStreamStateMachine {
10711114
case .clientIdleServerIdle(let state):
10721115
let contentType = metadata.firstString(forKey: .contentType)
10731116
.flatMap { ContentType(value: $0) }
1074-
guard contentType != nil else {
1117+
if contentType == nil {
1118+
self.state = .clientOpenServerClosed(.init(previousState: state))
1119+
10751120
// Respond with HTTP-level Unsupported Media Type status code.
10761121
var trailers = HPACKHeaders()
10771122
trailers.add("415", forKey: .status)
@@ -1080,13 +1125,50 @@ extension GRPCStreamStateMachine {
10801125

10811126
let path = metadata.firstString(forKey: .path)
10821127
.flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
1083-
guard path != nil else {
1084-
let status = Status(
1085-
code: .unimplemented,
1086-
message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
1128+
if path == nil {
1129+
return self.closeServerAndBuildRejectRPCAction(
1130+
currentState: state,
1131+
endStream: endStream,
1132+
rejectWithStatus: Status(
1133+
code: .unimplemented,
1134+
message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
1135+
)
1136+
)
1137+
}
1138+
1139+
let scheme = metadata.firstString(forKey: .scheme)
1140+
.flatMap { Scheme(rawValue: $0) }
1141+
if scheme == nil {
1142+
return self.closeServerAndBuildRejectRPCAction(
1143+
currentState: state,
1144+
endStream: endStream,
1145+
rejectWithStatus: Status(
1146+
code: .invalidArgument,
1147+
message: ":scheme header must be present and one of \"http\" or \"https\"."
1148+
)
1149+
)
1150+
}
1151+
1152+
guard let method = metadata.firstString(forKey: .method), method == "POST" else {
1153+
return self.closeServerAndBuildRejectRPCAction(
1154+
currentState: state,
1155+
endStream: endStream,
1156+
rejectWithStatus: Status(
1157+
code: .invalidArgument,
1158+
message: ":method header is expected to be present and have a value of \"POST\"."
1159+
)
1160+
)
1161+
}
1162+
1163+
guard let te = metadata.firstString(forKey: .te), te == "trailers" else {
1164+
return self.closeServerAndBuildRejectRPCAction(
1165+
currentState: state,
1166+
endStream: endStream,
1167+
rejectWithStatus: Status(
1168+
code: .invalidArgument,
1169+
message: "\"te\" header is expected to be present and have a value of \"trailers\"."
1170+
)
10871171
)
1088-
let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
1089-
return .rejectRPC(trailers: trailers)
10901172
}
10911173

10921174
func isIdentityOrCompatibleEncoding(_ clientEncoding: CompressionAlgorithm) -> Bool {
@@ -1265,15 +1347,15 @@ extension GRPCStreamStateMachine {
12651347
self.state = .clientClosedServerOpen(state)
12661348
return response.map { .sendMessage($0) } ?? .awaitMoreMessages
12671349
case .clientOpenServerClosed(var state):
1268-
let response = try state.framer.next(compressor: state.compressor)
1350+
let response = try state.framer?.next(compressor: state.compressor)
12691351
self.state = .clientOpenServerClosed(state)
12701352
if let response {
12711353
return .sendMessage(response)
12721354
} else {
12731355
return .noMoreMessages
12741356
}
12751357
case .clientClosedServerClosed(var state):
1276-
let response = try state.framer.next(compressor: state.compressor)
1358+
let response = try state.framer?.next(compressor: state.compressor)
12771359
self.state = .clientClosedServerClosed(state)
12781360
if let response {
12791361
return .sendMessage(response)

Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift

Lines changed: 171 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ extension HPACKHeaders {
4545
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
4646
GRPCHTTP2Keys.method.rawValue: "POST",
4747
GRPCHTTP2Keys.scheme.rawValue: "https",
48-
GRPCHTTP2Keys.te.rawValue: "te",
48+
GRPCHTTP2Keys.te.rawValue: "trailers",
4949
GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
5050
GRPCHTTP2Keys.encoding.rawValue: "deflate",
5151
]
@@ -54,7 +54,7 @@ extension HPACKHeaders {
5454
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
5555
GRPCHTTP2Keys.method.rawValue: "POST",
5656
GRPCHTTP2Keys.scheme.rawValue: "https",
57-
GRPCHTTP2Keys.te.rawValue: "te",
57+
GRPCHTTP2Keys.te.rawValue: "trailers",
5858
GRPCHTTP2Keys.acceptEncoding.rawValue: "gzip",
5959
GRPCHTTP2Keys.encoding.rawValue: "gzip",
6060
]
@@ -68,6 +68,45 @@ extension HPACKHeaders {
6868
fileprivate static let receivedWithoutEndpoint: Self = [
6969
GRPCHTTP2Keys.contentType.rawValue: "application/grpc"
7070
]
71+
fileprivate static let receivedWithoutTE: Self = [
72+
GRPCHTTP2Keys.path.rawValue: "test/test",
73+
GRPCHTTP2Keys.scheme.rawValue: "http",
74+
GRPCHTTP2Keys.method.rawValue: "POST",
75+
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
76+
]
77+
fileprivate static let receivedWithInvalidTE: Self = [
78+
GRPCHTTP2Keys.path.rawValue: "test/test",
79+
GRPCHTTP2Keys.scheme.rawValue: "http",
80+
GRPCHTTP2Keys.method.rawValue: "POST",
81+
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
82+
GRPCHTTP2Keys.te.rawValue: "invalidte",
83+
]
84+
fileprivate static let receivedWithoutMethod: Self = [
85+
GRPCHTTP2Keys.path.rawValue: "test/test",
86+
GRPCHTTP2Keys.scheme.rawValue: "http",
87+
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
88+
GRPCHTTP2Keys.te.rawValue: "trailers",
89+
]
90+
fileprivate static let receivedWithInvalidMethod: Self = [
91+
GRPCHTTP2Keys.path.rawValue: "test/test",
92+
GRPCHTTP2Keys.scheme.rawValue: "http",
93+
GRPCHTTP2Keys.method.rawValue: "GET",
94+
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
95+
GRPCHTTP2Keys.te.rawValue: "trailers",
96+
]
97+
fileprivate static let receivedWithoutScheme: Self = [
98+
GRPCHTTP2Keys.path.rawValue: "test/test",
99+
GRPCHTTP2Keys.method.rawValue: "POST",
100+
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
101+
GRPCHTTP2Keys.te.rawValue: "trailers",
102+
]
103+
fileprivate static let receivedWithInvalidScheme: Self = [
104+
GRPCHTTP2Keys.path.rawValue: "test/test",
105+
GRPCHTTP2Keys.scheme.rawValue: "invalidscheme",
106+
GRPCHTTP2Keys.method.rawValue: "POST",
107+
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
108+
GRPCHTTP2Keys.te.rawValue: "trailers",
109+
]
71110

72111
// Server
73112
fileprivate static let serverInitialMetadata: Self = [
@@ -1502,6 +1541,136 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
15021541
}
15031542
}
15041543

1544+
func testReceiveMetadataWhenClientIdleAndServerIdle_MissingTE() throws {
1545+
var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
1546+
1547+
let action = try stateMachine.receive(
1548+
metadata: .receivedWithoutTE,
1549+
endStream: false
1550+
)
1551+
1552+
self.assertRejectedRPC(action) { trailers in
1553+
XCTAssertEqual(
1554+
trailers,
1555+
[
1556+
":status": "200",
1557+
"content-type": "application/grpc",
1558+
"grpc-status": "3",
1559+
"grpc-status-message":
1560+
"\"te\" header is expected to be present and have a value of \"trailers\".",
1561+
]
1562+
)
1563+
}
1564+
}
1565+
1566+
func testReceiveMetadataWhenClientIdleAndServerIdle_InvalidTE() throws {
1567+
var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
1568+
1569+
let action = try stateMachine.receive(
1570+
metadata: .receivedWithInvalidTE,
1571+
endStream: false
1572+
)
1573+
1574+
self.assertRejectedRPC(action) { trailers in
1575+
XCTAssertEqual(
1576+
trailers,
1577+
[
1578+
":status": "200",
1579+
"content-type": "application/grpc",
1580+
"grpc-status": "3",
1581+
"grpc-status-message":
1582+
"\"te\" header is expected to be present and have a value of \"trailers\".",
1583+
]
1584+
)
1585+
}
1586+
}
1587+
1588+
func testReceiveMetadataWhenClientIdleAndServerIdle_MissingMethod() throws {
1589+
var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
1590+
1591+
let action = try stateMachine.receive(
1592+
metadata: .receivedWithoutMethod,
1593+
endStream: false
1594+
)
1595+
1596+
self.assertRejectedRPC(action) { trailers in
1597+
XCTAssertEqual(
1598+
trailers,
1599+
[
1600+
":status": "200",
1601+
"content-type": "application/grpc",
1602+
"grpc-status": "3",
1603+
"grpc-status-message":
1604+
":method header is expected to be present and have a value of \"POST\".",
1605+
]
1606+
)
1607+
}
1608+
}
1609+
1610+
func testReceiveMetadataWhenClientIdleAndServerIdle_InvalidMethod() throws {
1611+
var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
1612+
1613+
let action = try stateMachine.receive(
1614+
metadata: .receivedWithInvalidMethod,
1615+
endStream: false
1616+
)
1617+
1618+
self.assertRejectedRPC(action) { trailers in
1619+
XCTAssertEqual(
1620+
trailers,
1621+
[
1622+
":status": "200",
1623+
"content-type": "application/grpc",
1624+
"grpc-status": "3",
1625+
"grpc-status-message":
1626+
":method header is expected to be present and have a value of \"POST\".",
1627+
]
1628+
)
1629+
}
1630+
}
1631+
1632+
func testReceiveMetadataWhenClientIdleAndServerIdle_MissingScheme() throws {
1633+
var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
1634+
1635+
let action = try stateMachine.receive(
1636+
metadata: .receivedWithoutScheme,
1637+
endStream: false
1638+
)
1639+
1640+
self.assertRejectedRPC(action) { trailers in
1641+
XCTAssertEqual(
1642+
trailers,
1643+
[
1644+
":status": "200",
1645+
"content-type": "application/grpc",
1646+
"grpc-status": "3",
1647+
"grpc-status-message": ":scheme header must be present and one of \"http\" or \"https\".",
1648+
]
1649+
)
1650+
}
1651+
}
1652+
1653+
func testReceiveMetadataWhenClientIdleAndServerIdle_InvalidScheme() throws {
1654+
var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
1655+
1656+
let action = try stateMachine.receive(
1657+
metadata: .receivedWithInvalidScheme,
1658+
endStream: false
1659+
)
1660+
1661+
self.assertRejectedRPC(action) { trailers in
1662+
XCTAssertEqual(
1663+
trailers,
1664+
[
1665+
":status": "200",
1666+
"content-type": "application/grpc",
1667+
"grpc-status": "3",
1668+
"grpc-status-message": ":scheme header must be present and one of \"http\" or \"https\".",
1669+
]
1670+
)
1671+
}
1672+
}
1673+
15051674
func testReceiveMetadataWhenClientIdleAndServerIdle_ServerUnsupportedEncoding() throws {
15061675
var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
15071676

0 commit comments

Comments
 (0)