Skip to content

Commit 2d62bdf

Browse files
authored
Add GRPCServerStreamHandler (#1832)
1 parent 800fadc commit 2d62bdf

File tree

3 files changed

+1007
-1
lines changed

3 files changed

+1007
-1
lines changed

Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ struct GRPCStreamStateMachine {
332332
case .server:
333333
if endStream {
334334
try self.invalidState(
335-
"Can't end response stream by sending a message - send(status:metadata:trailersOnly:) must be called"
335+
"Can't end response stream by sending a message - send(status:metadata:) must be called"
336336
)
337337
}
338338
try self.serverSend(message: message)
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import GRPCCore
18+
import NIOCore
19+
import NIOHTTP2
20+
21+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
22+
final class GRPCServerStreamHandler: ChannelDuplexHandler {
23+
typealias InboundIn = HTTP2Frame.FramePayload
24+
typealias InboundOut = RPCRequestPart
25+
26+
typealias OutboundIn = RPCResponsePart
27+
typealias OutboundOut = HTTP2Frame.FramePayload
28+
29+
private var stateMachine: GRPCStreamStateMachine
30+
31+
private var isReading = false
32+
private var flushPending = false
33+
34+
// We buffer the final status + trailers to avoid reordering issues (i.e.,
35+
// if there are messages still not written into the channel because flush has
36+
// not been called, but the server sends back trailers).
37+
private var pendingTrailers: HTTP2Frame.FramePayload?
38+
39+
init(
40+
scheme: Scheme,
41+
acceptedEncodings: [CompressionAlgorithm],
42+
maximumPayloadSize: Int,
43+
skipStateMachineAssertions: Bool = false
44+
) {
45+
self.stateMachine = .init(
46+
configuration: .server(.init(scheme: scheme, acceptedEncodings: acceptedEncodings)),
47+
maximumPayloadSize: maximumPayloadSize,
48+
skipAssertions: skipStateMachineAssertions
49+
)
50+
}
51+
}
52+
53+
// - MARK: ChannelInboundHandler
54+
55+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
56+
extension GRPCServerStreamHandler {
57+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
58+
self.isReading = true
59+
let frame = self.unwrapInboundIn(data)
60+
switch frame {
61+
case .data(let frameData):
62+
let endStream = frameData.endStream
63+
switch frameData.data {
64+
case .byteBuffer(let buffer):
65+
do {
66+
try self.stateMachine.receive(message: buffer, endStream: endStream)
67+
loop: while true {
68+
switch self.stateMachine.nextInboundMessage() {
69+
case .receiveMessage(let message):
70+
context.fireChannelRead(self.wrapInboundOut(.message(message)))
71+
case .awaitMoreMessages:
72+
break loop
73+
case .noMoreMessages:
74+
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
75+
break loop
76+
}
77+
}
78+
} catch {
79+
context.fireErrorCaught(error)
80+
}
81+
82+
case .fileRegion:
83+
preconditionFailure("Unexpected IOData.fileRegion")
84+
}
85+
86+
case .headers(let headers):
87+
do {
88+
let action = try self.stateMachine.receive(
89+
metadata: headers.headers,
90+
endStream: headers.endStream
91+
)
92+
switch action {
93+
case .receivedMetadata(let metadata):
94+
context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
95+
96+
case .rejectRPC(let trailers):
97+
self.flushPending = true
98+
let response = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
99+
context.write(self.wrapOutboundOut(response), promise: nil)
100+
101+
case .receivedStatusAndMetadata:
102+
throw RPCError(
103+
code: .internalError,
104+
message: "Server cannot get receivedStatusAndMetadata."
105+
)
106+
107+
case .doNothing:
108+
throw RPCError(code: .internalError, message: "Server cannot receive doNothing.")
109+
}
110+
} catch {
111+
context.fireErrorCaught(error)
112+
}
113+
114+
case .ping, .goAway, .priority, .rstStream, .settings, .pushPromise, .windowUpdate,
115+
.alternativeService, .origin:
116+
()
117+
}
118+
}
119+
120+
func channelReadComplete(context: ChannelHandlerContext) {
121+
self.isReading = false
122+
if self.flushPending {
123+
self.flushPending = false
124+
context.flush()
125+
}
126+
context.fireChannelReadComplete()
127+
}
128+
129+
func handlerRemoved(context: ChannelHandlerContext) {
130+
self.stateMachine.tearDown()
131+
}
132+
}
133+
134+
// - MARK: ChannelOutboundHandler
135+
136+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
137+
extension GRPCServerStreamHandler {
138+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
139+
let frame = self.unwrapOutboundIn(data)
140+
switch frame {
141+
case .metadata(let metadata):
142+
do {
143+
self.flushPending = true
144+
let headers = try self.stateMachine.send(metadata: metadata)
145+
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil)
146+
// TODO: move the promise handling into the state machine
147+
promise?.succeed()
148+
} catch {
149+
context.fireErrorCaught(error)
150+
// TODO: move the promise handling into the state machine
151+
promise?.fail(error)
152+
}
153+
154+
case .message(let message):
155+
do {
156+
try self.stateMachine.send(message: message, endStream: false)
157+
// TODO: move the promise handling into the state machine
158+
promise?.succeed()
159+
} catch {
160+
context.fireErrorCaught(error)
161+
// TODO: move the promise handling into the state machine
162+
promise?.fail(error)
163+
}
164+
165+
case .status(let status, let metadata):
166+
do {
167+
let headers = try self.stateMachine.send(status: status, metadata: metadata)
168+
let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true))
169+
self.pendingTrailers = response
170+
// TODO: move the promise handling into the state machine
171+
promise?.succeed()
172+
} catch {
173+
context.fireErrorCaught(error)
174+
// TODO: move the promise handling into the state machine
175+
promise?.fail(error)
176+
}
177+
}
178+
}
179+
180+
func flush(context: ChannelHandlerContext) {
181+
if self.isReading {
182+
// We don't want to flush yet if we're still in a read loop.
183+
return
184+
}
185+
186+
do {
187+
loop: while true {
188+
switch try self.stateMachine.nextOutboundMessage() {
189+
case .sendMessage(let byteBuffer):
190+
self.flushPending = true
191+
context.write(
192+
self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))),
193+
promise: nil
194+
)
195+
196+
case .noMoreMessages:
197+
if let pendingTrailers = self.pendingTrailers {
198+
self.flushPending = true
199+
self.pendingTrailers = nil
200+
context.write(self.wrapOutboundOut(pendingTrailers), promise: nil)
201+
}
202+
break loop
203+
204+
case .awaitMoreMessages:
205+
break loop
206+
}
207+
}
208+
209+
if self.flushPending {
210+
self.flushPending = false
211+
context.flush()
212+
}
213+
} catch {
214+
context.fireErrorCaught(error)
215+
}
216+
}
217+
}

0 commit comments

Comments
 (0)