Skip to content

Commit 07233c5

Browse files
authored
Adopt NIOAsyncSequenceProducer in grpc-swift (#1477)
We want to make use of `NIOAsyncSequenceProducer` (or its throwing counterpart) instead of using `PassthroughMessageSequence` and `PassthroughMessageSource`. * Replaced usages of `PassthroughMessageSequence` and `PassthroughMessageSource` with `NIOThrowingAsyncSequenceProducer` grpc-swift now uses `NIOAsyncSequenceProducer`
1 parent 2bf2a16 commit 07233c5

10 files changed

+211
-484
lines changed

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
*/
1616
#if compiler(>=5.6)
1717

18+
import NIOCore
1819
import NIOHPACK
1920

2021
/// Async-await variant of ``BidirectionalStreamingCall``.
2122
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
2223
public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable>: Sendable {
2324
private let call: Call<Request, Response>
2425
private let responseParts: StreamingResponseParts<Response>
25-
private let responseSource: PassthroughMessageSource<Response, Error>
26+
private let responseSource: NIOThrowingAsyncSequenceProducer<
27+
Response,
28+
Error,
29+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
30+
GRPCAsyncSequenceProducerDelegate
31+
>.Source
2632

2733
/// A request stream writer for sending messages to the server.
2834
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
@@ -80,8 +86,17 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
8086
private init(call: Call<Request, Response>) {
8187
self.call = call
8288
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
83-
self.responseSource = PassthroughMessageSource<Response, Error>()
84-
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
89+
let sequence = NIOThrowingAsyncSequenceProducer<
90+
Response,
91+
Error,
92+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
93+
GRPCAsyncSequenceProducerDelegate
94+
>.makeSequence(
95+
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
96+
delegate: GRPCAsyncSequenceProducerDelegate()
97+
)
98+
self.responseSource = sequence.source
99+
self.responseStream = .init(sequence.sequence)
85100
self.requestStream = call.makeRequestStreamWriter()
86101
}
87102

@@ -96,7 +111,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
96111
},
97112
onError: { error in
98113
asyncCall.responseParts.handleError(error)
99-
asyncCall.responseSource.finish(throwing: error)
114+
asyncCall.responseSource.finish(error)
100115
asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
101116
},
102117
onResponsePart: AsyncCall.makeResponsePartHandler(
@@ -114,7 +129,12 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
114129
internal enum AsyncCall {
115130
internal static func makeResponsePartHandler<Response, Request>(
116131
responseParts: StreamingResponseParts<Response>,
117-
responseSource: PassthroughMessageSource<Response, Error>,
132+
responseSource: NIOThrowingAsyncSequenceProducer<
133+
Response,
134+
Error,
135+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
136+
GRPCAsyncSequenceProducerDelegate
137+
>.Source,
118138
requestStream: GRPCAsyncRequestStreamWriter<Request>?,
119139
requestType: Request.Type = Request.self
120140
) -> (GRPCClientResponsePart<Response>) -> Void {
@@ -135,7 +155,7 @@ internal enum AsyncCall {
135155
if status.isOk {
136156
responseSource.finish()
137157
} else {
138-
responseSource.finish(throwing: status)
158+
responseSource.finish(status)
139159
}
140160

141161
requestStream?.asyncWriter.cancelAsynchronously(withError: status)

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616

1717
#if compiler(>=5.6)
18+
import NIOCore
19+
1820
/// A type for the stream of request messages send to a gRPC server method.
1921
///
2022
/// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
@@ -77,16 +79,26 @@ public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
7779

7880
@usableFromInline
7981
enum Backing: Sendable {
80-
case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>)
8182
case asyncStream(AsyncThrowingStream<Element, Error>)
83+
case nioThrowingAsyncSequence(NIOThrowingAsyncSequenceProducer<
84+
Element,
85+
Error,
86+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
87+
GRPCAsyncSequenceProducerDelegate
88+
>)
8289
}
8390

8491
@usableFromInline
8592
internal let backing: Backing
8693

8794
@inlinable
88-
internal init(_ sequence: PassthroughMessageSequence<Element, Error>) {
89-
self.backing = .passthroughMessageSequence(sequence)
95+
internal init(_ sequence: NIOThrowingAsyncSequenceProducer<
96+
Element,
97+
Error,
98+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
99+
GRPCAsyncSequenceProducerDelegate
100+
>) {
101+
self.backing = .nioThrowingAsyncSequence(sequence)
90102
}
91103

92104
@inlinable
@@ -112,18 +124,23 @@ public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
112124
@inlinable
113125
public func makeAsyncIterator() -> Iterator {
114126
switch self.backing {
115-
case let .passthroughMessageSequence(sequence):
116-
return Self.AsyncIterator(.passthroughMessageSequence(sequence.makeAsyncIterator()))
117127
case let .asyncStream(stream):
118128
return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
129+
case let .nioThrowingAsyncSequence(sequence):
130+
return Self.AsyncIterator(.nioThrowingAsyncSequence(sequence.makeAsyncIterator()))
119131
}
120132
}
121133

122134
public struct Iterator: AsyncIteratorProtocol {
123135
@usableFromInline
124136
enum BackingIterator {
125-
case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>.Iterator)
126137
case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
138+
case nioThrowingAsyncSequence(NIOThrowingAsyncSequenceProducer<
139+
Element,
140+
Error,
141+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
142+
GRPCAsyncSequenceProducerDelegate
143+
>.AsyncIterator)
127144
}
128145

129146
@usableFromInline
@@ -137,12 +154,12 @@ public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
137154
@inlinable
138155
public mutating func next() async throws -> Element? {
139156
switch self.iterator {
140-
case let .passthroughMessageSequence(iterator):
141-
return try await iterator.next()
142157
case var .asyncStream(iterator):
143158
let element = try await iterator.next()
144159
self.iterator = .asyncStream(iterator)
145160
return element
161+
case let .nioThrowingAsyncSequence(iterator):
162+
return try await iterator.next()
146163
}
147164
}
148165
}

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,19 @@
1515
*/
1616
#if compiler(>=5.6)
1717

18+
import NIOCore
19+
1820
/// This is currently a wrapper around AsyncThrowingStream because we want to be
1921
/// able to swap out the implementation for something else in the future.
2022
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
2123
public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {
2224
@usableFromInline
23-
internal typealias WrappedStream = PassthroughMessageSequence<Element, Error>
25+
internal typealias WrappedStream = NIOThrowingAsyncSequenceProducer<
26+
Element,
27+
Error,
28+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
29+
GRPCAsyncSequenceProducerDelegate
30+
>
2431

2532
@usableFromInline
2633
internal let stream: WrappedStream
@@ -52,7 +59,5 @@ public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {
5259

5360
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
5461
extension GRPCAsyncResponseStream: Sendable {}
55-
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
56-
extension GRPCAsyncResponseStream.Iterator: Sendable {}
5762

5863
#endif
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2022, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if compiler(>=5.6)
18+
19+
import NIOCore
20+
21+
@usableFromInline
22+
internal struct GRPCAsyncSequenceProducerDelegate: NIOAsyncSequenceProducerDelegate {
23+
@inlinable
24+
internal init() {}
25+
26+
// TODO: this method will have to be implemented when we add support for backpressure.
27+
@inlinable
28+
internal func produceMore() {}
29+
30+
// TODO: this method will have to be implemented when we add support for backpressure.
31+
@inlinable
32+
internal func didTerminate() {}
33+
}
34+
35+
#endif

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ internal final class AsyncServerHandler<
235235
GRPCAsyncServerCallContext
236236
) async throws -> Void
237237

238+
@usableFromInline
239+
internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
240+
Request,
241+
Error,
242+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
243+
GRPCAsyncSequenceProducerDelegate
244+
>
245+
238246
@inlinable
239247
internal init(
240248
context: CallHandlerContext,
@@ -422,7 +430,10 @@ internal final class AsyncServerHandler<
422430
contextProvider: self
423431
)
424432

425-
let requestSource = PassthroughMessageSource<Request, Error>()
433+
let sequenceProducer = AsyncSequenceProducer.makeSequence(
434+
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
435+
delegate: GRPCAsyncSequenceProducerDelegate()
436+
)
426437

427438
let writerDelegate = AsyncResponseStreamWriterDelegate(
428439
send: self.interceptResponseMessage(_:compression:),
@@ -448,12 +459,13 @@ internal final class AsyncServerHandler<
448459
// Update our state before invoke the handler.
449460
self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
450461
self.handlerComponents = ServerHandlerComponents(
451-
requestSource: requestSource,
462+
requestSource: sequenceProducer.source,
452463
responseWriter: writer,
453464
task: promise.completeWithTask {
454465
// We don't have a task cancellation handler here: we do it in `self.cancel()`.
455466
try await self.invokeUserHandler(
456-
requestStreamSource: requestSource,
467+
sequence: sequenceProducer.sequence,
468+
sequenceSource: sequenceProducer.source,
457469
responseStreamWriter: writer,
458470
callContext: handlerContext
459471
)
@@ -468,18 +480,19 @@ internal final class AsyncServerHandler<
468480
@Sendable
469481
@usableFromInline
470482
internal func invokeUserHandler(
471-
requestStreamSource: PassthroughMessageSource<Request, Error>,
483+
sequence: AsyncSequenceProducer,
484+
sequenceSource: AsyncSequenceProducer.Source,
472485
responseStreamWriter: AsyncWriter<AsyncResponseStreamWriterDelegate<Response>>,
473486
callContext: GRPCAsyncServerCallContext
474487
) async throws {
475488
defer {
476489
// It's possible the user handler completed before the end of the request stream. We
477490
// explicitly finish it to drop any unconsumed inbound messages.
478-
requestStreamSource.finish()
491+
sequenceSource.finish()
479492
}
480493

481494
do {
482-
let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
495+
let requestStream = GRPCAsyncRequestStream(sequence)
483496
let responseStream = GRPCAsyncResponseStreamWriter(wrapping: responseStreamWriter)
484497
try await self.userHandler(requestStream, responseStream, callContext)
485498

@@ -530,7 +543,7 @@ internal final class AsyncServerHandler<
530543
case .forward:
531544
switch self.handlerStateMachine.handleMessage() {
532545
case .forward:
533-
self.handlerComponents?.requestSource.yield(request)
546+
_ = self.handlerComponents?.requestSource.yield(request)
534547
case .cancel:
535548
self.cancel(error: nil)
536549
}
@@ -809,11 +822,21 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
809822
@usableFromInline
810823
internal let responseWriter: AsyncWriter<Delegate>
811824
@usableFromInline
812-
internal let requestSource: PassthroughMessageSource<Request, Error>
825+
internal let requestSource: NIOThrowingAsyncSequenceProducer<
826+
Request,
827+
Error,
828+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
829+
GRPCAsyncSequenceProducerDelegate
830+
>.Source
813831

814832
@inlinable
815833
init(
816-
requestSource: PassthroughMessageSource<Request, Error>,
834+
requestSource: NIOThrowingAsyncSequenceProducer<
835+
Request,
836+
Error,
837+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
838+
GRPCAsyncSequenceProducerDelegate
839+
>.Source,
817840
responseWriter: AsyncWriter<Delegate>,
818841
task: Task<Void, Never>
819842
) {
@@ -830,7 +853,7 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
830853
// to the request stream, and cancelling the writer will ensure no more responses are
831854
// written. This should reduce how long the user handler runs for as it can no longer do
832855
// anything useful.
833-
self.requestSource.finish(throwing: CancellationError())
856+
self.requestSource.finish()
834857
self.responseWriter.cancelAsynchronously(withError: CancellationError())
835858
self.task.cancel()
836859
}

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
*/
1616
#if compiler(>=5.6)
1717

18+
import NIOCore
1819
import NIOHPACK
1920

2021
/// Async-await variant of ``ServerStreamingCall``.
2122
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
2223
public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable> {
2324
private let call: Call<Request, Response>
2425
private let responseParts: StreamingResponseParts<Response>
25-
private let responseSource: PassthroughMessageSource<Response, Error>
26+
private let responseSource: NIOThrowingAsyncSequenceProducer<
27+
Response,
28+
Error,
29+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
30+
GRPCAsyncSequenceProducerDelegate
31+
>.Source
2632

2733
/// The stream of responses from the server.
2834
public let responseStream: GRPCAsyncResponseStream<Response>
@@ -79,8 +85,17 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
7985
// We ignore messages in the closure and instead feed them into the response source when we
8086
// invoke the `call`.
8187
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
82-
self.responseSource = PassthroughMessageSource<Response, Error>()
83-
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
88+
let sequence = NIOThrowingAsyncSequenceProducer<
89+
Response,
90+
Error,
91+
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
92+
GRPCAsyncSequenceProducerDelegate
93+
>.makeSequence(
94+
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
95+
delegate: GRPCAsyncSequenceProducerDelegate()
96+
)
97+
self.responseSource = sequence.source
98+
self.responseStream = .init(sequence.sequence)
8499
}
85100

86101
/// We expose this as the only non-private initializer so that the caller
@@ -96,7 +111,7 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
96111
onStart: {},
97112
onError: { error in
98113
asyncCall.responseParts.handleError(error)
99-
asyncCall.responseSource.finish(throwing: error)
114+
asyncCall.responseSource.finish(error)
100115
},
101116
onResponsePart: AsyncCall.makeResponsePartHandler(
102117
responseParts: asyncCall.responseParts,

0 commit comments

Comments
 (0)