Skip to content

Commit f3380d2

Browse files
authored
Factor out shared http2 server transport code (#2058)
Motivation: The NIOPosix and NIOTS HTTP/2 server transports have quite a lot of shared code. The only meaningfull difference between them is how they create their channels. Modification: - Factor out the shared parts to a `CommonHTTP2ServerTransport` - Allow each tranport to inject a factory for creating a listener Result: Less duplication
1 parent c51784e commit f3380d2

File tree

5 files changed

+419
-492
lines changed

5 files changed

+419
-492
lines changed

[email protected]

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ extension Target {
251251
.nioCore,
252252
.nioHTTP2,
253253
.nioTLS,
254+
.nioExtras,
254255
.cgrpcZlib,
255256
.dequeModule,
256257
],
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package import GRPCCore
18+
package import NIOCore
19+
package import NIOExtras
20+
private import NIOHTTP2
21+
private import Synchronization
22+
23+
/// Provides the common functionality for a `NIO`-based server transport.
24+
///
25+
/// - SeeAlso: ``HTTP2ListenerFactory``.
26+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
27+
package final class CommonHTTP2ServerTransport<
28+
ListenerFactory: HTTP2ListenerFactory
29+
>: ServerTransport, ListeningServerTransport {
30+
private let eventLoopGroup: any EventLoopGroup
31+
private let address: SocketAddress
32+
private let listeningAddressState: Mutex<State>
33+
private let serverQuiescingHelper: ServerQuiescingHelper
34+
private let factory: ListenerFactory
35+
36+
private enum State {
37+
case idle(EventLoopPromise<SocketAddress>)
38+
case listening(EventLoopFuture<SocketAddress>)
39+
case closedOrInvalidAddress(RuntimeError)
40+
41+
var listeningAddressFuture: EventLoopFuture<SocketAddress> {
42+
get throws {
43+
switch self {
44+
case .idle(let eventLoopPromise):
45+
return eventLoopPromise.futureResult
46+
case .listening(let eventLoopFuture):
47+
return eventLoopFuture
48+
case .closedOrInvalidAddress(let runtimeError):
49+
throw runtimeError
50+
}
51+
}
52+
}
53+
54+
enum OnBound {
55+
case succeedPromise(_ promise: EventLoopPromise<SocketAddress>, address: SocketAddress)
56+
case failPromise(_ promise: EventLoopPromise<SocketAddress>, error: RuntimeError)
57+
}
58+
59+
mutating func addressBound(
60+
_ address: NIOCore.SocketAddress?,
61+
userProvidedAddress: SocketAddress
62+
) -> OnBound {
63+
switch self {
64+
case .idle(let listeningAddressPromise):
65+
if let address {
66+
self = .listening(listeningAddressPromise.futureResult)
67+
return .succeedPromise(listeningAddressPromise, address: SocketAddress(address))
68+
} else if userProvidedAddress.virtualSocket != nil {
69+
self = .listening(listeningAddressPromise.futureResult)
70+
return .succeedPromise(listeningAddressPromise, address: userProvidedAddress)
71+
} else {
72+
assertionFailure("Unknown address type")
73+
let invalidAddressError = RuntimeError(
74+
code: .transportError,
75+
message: "Unknown address type returned by transport."
76+
)
77+
self = .closedOrInvalidAddress(invalidAddressError)
78+
return .failPromise(listeningAddressPromise, error: invalidAddressError)
79+
}
80+
81+
case .listening, .closedOrInvalidAddress:
82+
fatalError("Invalid state: addressBound should only be called once and when in idle state")
83+
}
84+
}
85+
86+
enum OnClose {
87+
case failPromise(EventLoopPromise<SocketAddress>, error: RuntimeError)
88+
case doNothing
89+
}
90+
91+
mutating func close() -> OnClose {
92+
let serverStoppedError = RuntimeError(
93+
code: .serverIsStopped,
94+
message: """
95+
There is no listening address bound for this server: there may have been \
96+
an error which caused the transport to close, or it may have shut down.
97+
"""
98+
)
99+
100+
switch self {
101+
case .idle(let listeningAddressPromise):
102+
self = .closedOrInvalidAddress(serverStoppedError)
103+
return .failPromise(listeningAddressPromise, error: serverStoppedError)
104+
105+
case .listening:
106+
self = .closedOrInvalidAddress(serverStoppedError)
107+
return .doNothing
108+
109+
case .closedOrInvalidAddress:
110+
return .doNothing
111+
}
112+
}
113+
}
114+
115+
/// The listening address for this server transport.
116+
///
117+
/// It is an `async` property because it will only return once the address has been successfully bound.
118+
///
119+
/// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
120+
/// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
121+
/// invalid address.
122+
package var listeningAddress: SocketAddress {
123+
get async throws {
124+
try await self.listeningAddressState
125+
.withLock { try $0.listeningAddressFuture }
126+
.get()
127+
}
128+
}
129+
130+
package init(
131+
address: SocketAddress,
132+
eventLoopGroup: any EventLoopGroup,
133+
quiescingHelper: ServerQuiescingHelper,
134+
listenerFactory: ListenerFactory
135+
) {
136+
self.eventLoopGroup = eventLoopGroup
137+
self.address = address
138+
139+
let eventLoop = eventLoopGroup.any()
140+
self.listeningAddressState = Mutex(.idle(eventLoop.makePromise()))
141+
142+
self.factory = listenerFactory
143+
self.serverQuiescingHelper = quiescingHelper
144+
}
145+
146+
package func listen(
147+
_ streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
148+
) async throws {
149+
defer {
150+
switch self.listeningAddressState.withLock({ $0.close() }) {
151+
case .failPromise(let promise, let error):
152+
promise.fail(error)
153+
case .doNothing:
154+
()
155+
}
156+
}
157+
158+
let serverChannel = try await self.factory.makeListeningChannel(
159+
eventLoopGroup: self.eventLoopGroup,
160+
address: self.address,
161+
serverQuiescingHelper: self.serverQuiescingHelper
162+
)
163+
164+
let action = self.listeningAddressState.withLock {
165+
$0.addressBound(
166+
serverChannel.channel.localAddress,
167+
userProvidedAddress: self.address
168+
)
169+
}
170+
switch action {
171+
case .succeedPromise(let promise, let address):
172+
promise.succeed(address)
173+
case .failPromise(let promise, let error):
174+
promise.fail(error)
175+
}
176+
177+
try await serverChannel.executeThenClose { inbound in
178+
try await withThrowingDiscardingTaskGroup { group in
179+
for try await (connectionChannel, streamMultiplexer) in inbound {
180+
group.addTask {
181+
try await self.handleConnection(
182+
connectionChannel,
183+
multiplexer: streamMultiplexer,
184+
streamHandler: streamHandler
185+
)
186+
}
187+
}
188+
}
189+
}
190+
}
191+
192+
private func handleConnection(
193+
_ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
194+
multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
195+
streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
196+
) async throws {
197+
try await connection.executeThenClose { inbound, _ in
198+
await withDiscardingTaskGroup { group in
199+
group.addTask {
200+
do {
201+
for try await _ in inbound {}
202+
} catch {
203+
// We don't want to close the channel if one connection throws.
204+
return
205+
}
206+
}
207+
208+
do {
209+
for try await (stream, descriptor) in multiplexer.inbound {
210+
group.addTask {
211+
await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
212+
}
213+
}
214+
} catch {
215+
return
216+
}
217+
}
218+
}
219+
}
220+
221+
private func handleStream(
222+
_ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
223+
handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
224+
descriptor: EventLoopFuture<MethodDescriptor>
225+
) async {
226+
// It's okay to ignore these errors:
227+
// - If we get an error because the http2Stream failed to close, then there's nothing we can do
228+
// - If we get an error because the inner closure threw, then the only possible scenario in which
229+
// that could happen is if methodDescriptor.get() throws - in which case, it means we never got
230+
// the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
231+
try? await stream.executeThenClose { inbound, outbound in
232+
guard let descriptor = try? await descriptor.get() else {
233+
return
234+
}
235+
236+
let rpcStream = RPCStream(
237+
descriptor: descriptor,
238+
inbound: RPCAsyncSequence(wrapping: inbound),
239+
outbound: RPCWriter.Closable(
240+
wrapping: ServerConnection.Stream.Outbound(
241+
responseWriter: outbound,
242+
http2Stream: stream
243+
)
244+
)
245+
)
246+
247+
await streamHandler(rpcStream)
248+
}
249+
}
250+
251+
package func beginGracefulShutdown() {
252+
self.serverQuiescingHelper.initiateShutdown(promise: nil)
253+
}
254+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package import NIOCore
18+
package import NIOExtras
19+
20+
/// A factory to produce `NIOAsyncChannel`s to listen for new HTTP/2 connections.
21+
///
22+
/// - SeeAlso: ``CommonHTTP2ServerTransport``
23+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
24+
package protocol HTTP2ListenerFactory: Sendable {
25+
typealias AcceptedChannel = (
26+
ChannelPipeline.SynchronousOperations.HTTP2ConnectionChannel,
27+
ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer
28+
)
29+
30+
func makeListeningChannel(
31+
eventLoopGroup: any EventLoopGroup,
32+
address: SocketAddress,
33+
serverQuiescingHelper: ServerQuiescingHelper
34+
) async throws -> NIOAsyncChannel<AcceptedChannel, Never>
35+
}

0 commit comments

Comments
 (0)