Skip to content

Commit 4d5d6fe

Browse files
authored
Add support for RPC cancellation (#13)
Motivation: grpc-swift has support for RPC cancellation via a cancellation handler. It should be supported here as well. Modifications: - Modify the server stream handler so that it holds the cancellation handle and triggers cancellation appropriately. - Modify the server stream handling to set a cancellation handle on the appropriate stream handler. - Update tests Result: HTTP/2 server transport respects stream cancellation
1 parent 350aa4f commit 4d5d6fe

File tree

11 files changed

+327
-177
lines changed

11 files changed

+327
-177
lines changed

Package.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ let products: [Product] = [
3535
let dependencies: [Package.Dependency] = [
3636
.package(
3737
url: "https://github.com/grpc/grpc-swift.git",
38-
exact: "2.0.0-alpha.1"
38+
branch: "main"
3939
),
4040
.package(
4141
url: "https://github.com/apple/swift-nio.git",
4242
from: "2.65.0"
4343
),
4444
.package(
4545
url: "https://github.com/apple/swift-nio-http2.git",
46-
from: "1.32.0"
46+
from: "1.34.1"
4747
),
4848
.package(
4949
url: "https://github.com/apple/swift-nio-transport-services.git",

Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ extension ChannelPipeline.SynchronousOperations {
8585
scheme: scheme,
8686
acceptedEncodings: compressionConfig.enabledAlgorithms,
8787
maxPayloadSize: rpcConfig.maxRequestPayloadSize,
88-
methodDescriptorPromise: methodDescriptorPromise
88+
methodDescriptorPromise: methodDescriptorPromise,
89+
eventLoop: streamChannel.eventLoop
8990
)
9091
try streamChannel.pipeline.syncOperations.addHandler(streamHandler)
9192

Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -241,19 +241,35 @@ package final class CommonHTTP2ServerTransport<
241241
return
242242
}
243243

244-
let rpcStream = RPCStream(
245-
descriptor: descriptor,
246-
inbound: RPCAsyncSequence(wrapping: inbound),
247-
outbound: RPCWriter.Closable(
248-
wrapping: ServerConnection.Stream.Outbound(
249-
responseWriter: outbound,
250-
http2Stream: stream
244+
await withServerContextRPCCancellationHandle { handle in
245+
stream.channel.eventLoop.execute {
246+
// Sync is safe: this is on the right event loop.
247+
let sync = stream.channel.pipeline.syncOperations
248+
249+
do {
250+
let handler = try sync.handler(type: GRPCServerStreamHandler.self)
251+
handler.setCancellationHandle(handle)
252+
} catch {
253+
// Looking up the handler can fail if the channel is already closed, in which case
254+
// don't execute the RPC, just return early.
255+
return
256+
}
257+
}
258+
259+
let rpcStream = RPCStream(
260+
descriptor: descriptor,
261+
inbound: RPCAsyncSequence(wrapping: inbound),
262+
outbound: RPCWriter.Closable(
263+
wrapping: ServerConnection.Stream.Outbound(
264+
responseWriter: outbound,
265+
http2Stream: stream
266+
)
251267
)
252268
)
253-
)
254269

255-
let context = ServerContext(descriptor: descriptor)
256-
await streamHandler(rpcStream, context)
270+
let context = ServerContext(descriptor: descriptor, cancellation: handle)
271+
await streamHandler(rpcStream, context)
272+
}
257273
}
258274
}
259275

Sources/GRPCNIOTransportCore/Server/GRPCServerStreamHandler.swift

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChan
2626
package typealias OutboundOut = HTTP2Frame.FramePayload
2727

2828
private var stateMachine: GRPCStreamStateMachine
29+
private let eventLoop: any EventLoop
2930

3031
private var isReading = false
3132
private var flushPending = false
33+
private var isCancelled = false
3234

3335
// We buffer the final status + trailers to avoid reordering issues (i.e.,
3436
// if there are messages still not written into the channel because flush has
@@ -38,6 +40,8 @@ package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChan
3840

3941
private let methodDescriptorPromise: EventLoopPromise<MethodDescriptor>
4042

43+
private var cancellationHandle: Optional<ServerContext.RPCCancellationHandle>
44+
4145
// Existential errors unconditionally allocate, avoid this per-use allocation by doing it
4246
// statically.
4347
private static let handlerRemovedBeforeDescriptorResolved: any Error = RPCError(
@@ -50,6 +54,8 @@ package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChan
5054
acceptedEncodings: CompressionAlgorithmSet,
5155
maxPayloadSize: Int,
5256
methodDescriptorPromise: EventLoopPromise<MethodDescriptor>,
57+
eventLoop: any EventLoop,
58+
cancellationHandler: ServerContext.RPCCancellationHandle? = nil,
5359
skipStateMachineAssertions: Bool = false
5460
) {
5561
self.stateMachine = .init(
@@ -58,12 +64,54 @@ package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChan
5864
skipAssertions: skipStateMachineAssertions
5965
)
6066
self.methodDescriptorPromise = methodDescriptorPromise
67+
self.cancellationHandle = cancellationHandler
68+
self.eventLoop = eventLoop
69+
}
70+
71+
package func setCancellationHandle(_ handle: ServerContext.RPCCancellationHandle) {
72+
if self.eventLoop.inEventLoop {
73+
self.syncSetCancellationHandle(handle)
74+
} else {
75+
let loopBoundSelf = NIOLoopBound(self, eventLoop: self.eventLoop)
76+
self.eventLoop.execute {
77+
loopBoundSelf.value.syncSetCancellationHandle(handle)
78+
}
79+
}
80+
}
81+
82+
private func syncSetCancellationHandle(_ handle: ServerContext.RPCCancellationHandle) {
83+
assert(self.cancellationHandle == nil, "\(#function) must only be called once")
84+
85+
if self.isCancelled {
86+
handle.cancel()
87+
} else {
88+
self.cancellationHandle = handle
89+
}
90+
}
91+
92+
private func cancelRPC() {
93+
if let handle = self.cancellationHandle.take() {
94+
handle.cancel()
95+
} else {
96+
self.isCancelled = true
97+
}
6198
}
6299
}
63100

64101
// - MARK: ChannelInboundHandler
65102

66103
extension GRPCServerStreamHandler {
104+
package func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
105+
switch event {
106+
case is ChannelShouldQuiesceEvent:
107+
self.cancelRPC()
108+
default:
109+
()
110+
}
111+
112+
context.fireUserInboundEventTriggered(event)
113+
}
114+
67115
package func channelRead(context: ChannelHandlerContext, data: NIOAny) {
68116
self.isReading = true
69117
let frame = self.unwrapInboundIn(data)
@@ -186,6 +234,7 @@ extension GRPCServerStreamHandler {
186234
) {
187235
switch self.stateMachine.unexpectedInboundClose(reason: reason) {
188236
case .fireError_serverOnly(let wrappedError):
237+
self.cancelRPC()
189238
context.fireErrorCaught(wrappedError)
190239
case .doNothing:
191240
()

Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/ConnectionTest.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ extension ConnectionTest {
117117
scheme: .http,
118118
acceptedEncodings: .none,
119119
maxPayloadSize: .max,
120-
methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
120+
methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
121+
eventLoop: stream.eventLoop
121122
)
122123

123124
return stream.eventLoop.makeCompletedFuture {

Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/TestServer.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ final class TestServer: Sendable {
7474
scheme: .http,
7575
acceptedEncodings: .all,
7676
maxPayloadSize: .max,
77-
methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
77+
methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
78+
eventLoop: stream.eventLoop
7879
)
7980

8081
try stream.pipeline.syncOperations.addHandlers(handler)

0 commit comments

Comments
 (0)