Skip to content

Commit cfa950f

Browse files
authored
Allow to write a sequence of requests/responses (#1499)
* Allow to write a sequence of requests/responses # Motivation We recently adopted the `NIOAsyncWriter` to back the `GRPCAsyncRequestStreamWriter` and the `GRPCAsyncResponseStreamWriter`; however, we did not expose the functionality to write a sequence of the elements that the `NIOAsyncWriter` offers. This can be useful in cases where you want to write a batch of requests/responses since it reduces the amount of locks and thread hops. # Modification Expose new methods on the `GRPCAsyncRequestStreamWriter` and the `GRPCAsyncResponseStreamWriter` to enable to write a sequence. I also fixed up the comments for the `GRPCAsyncRequestStreamWriter` since they were outdated. # Result Users can write a batch of requests/responses. * Update tests
1 parent 4a1fab1 commit cfa950f

File tree

4 files changed

+66
-11
lines changed

4 files changed

+66
-11
lines changed

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,15 @@ public struct GRPCAsyncRequestStreamWriter<Request: Sendable>: Sendable {
4949

5050
/// Send a single request.
5151
///
52-
/// To ensure requests are delivered in order callers should `await` the result of this call
53-
/// before sending another request. Callers who do not need this guarantee do not have to `await`
54-
/// the completion of this call and may send messages concurrently from multiple ``Task``s.
55-
/// However, it is important to note that no more than 16 writes may be pending at any one time
56-
/// and attempting to exceed this will result in an ``GRPCAsyncWriterError/tooManyPendingWrites``
57-
/// error being thrown.
52+
/// It is safe to send multiple requests concurrently by sharing the ``GRPCAsyncRequestStreamWriter`` across tasks.
5853
///
5954
/// Callers must call ``finish()`` when they have no more requests left to send.
6055
///
6156
/// - Parameters:
6257
/// - request: The request to send.
6358
/// - compression: Whether the request should be compressed or not. Ignored if compression was
6459
/// not enabled for the RPC.
65-
/// - Throws: ``GRPCAsyncWriterError`` if there are too many pending writes or the request stream
66-
/// has already been finished.
60+
/// - Throws: If the request stream has already been finished.
6761
@inlinable
6862
public func send(
6963
_ request: Request,
@@ -72,6 +66,25 @@ public struct GRPCAsyncRequestStreamWriter<Request: Sendable>: Sendable {
7266
try await self.asyncWriter.yield((request, compression))
7367
}
7468

69+
/// Send a sequence of requests.
70+
///
71+
/// It is safe to send multiple requests concurrently by sharing the ``GRPCAsyncRequestStreamWriter`` across tasks.
72+
///
73+
/// Callers must call ``finish()`` when they have no more requests left to send.
74+
///
75+
/// - Parameters:
76+
/// - requests: The requests to send.
77+
/// - compression: Whether the requests should be compressed or not. Ignored if compression was
78+
/// not enabled for the RPC.
79+
/// - Throws: If the request stream has already been finished.
80+
@inlinable
81+
public func send<S: Sequence>(
82+
_ requests: S,
83+
compression: Compression = .deferToCallDefault
84+
) async throws where S.Element == Request {
85+
try await self.asyncWriter.yield(contentsOf: requests.lazy.map { ($0, compression) })
86+
}
87+
7588
/// Finish the request stream for the RPC. This must be called when there are no more requests to be sent.
7689
public func finish() async throws {
7790
self.asyncWriter.finish()

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,23 @@ public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
129129
}
130130
}
131131

132+
@inlinable
133+
public func send<S: Sequence>(
134+
contentsOf responses: S,
135+
compression: Compression = .deferToCallDefault
136+
) async throws where S.Element == Response {
137+
let responsesWithCompression = responses.lazy.map { ($0, compression) }
138+
switch self.backing {
139+
case let .asyncWriter(writer):
140+
try await writer.yield(contentsOf: responsesWithCompression)
141+
142+
case let .closure(closure):
143+
for response in responsesWithCompression {
144+
await closure(response)
145+
}
146+
}
147+
}
148+
132149
/// Creates a new `GRPCAsyncResponseStreamWriter` backed by a ``ResponseStream``.
133150
/// This is mostly useful for testing purposes where one wants to observe the written responses.
134151
///

Tests/GRPCTests/GRPCAsyncClientCallTests.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,17 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
132132
callOptions: .init()
133133
)
134134

135-
for word in ["boyle", "jeffers", "holt"] {
136-
try await update.requestStream.send(.with { $0.text = word })
135+
let requests = ["boyle", "jeffers", "holt"]
136+
.map { word in Echo_EchoRequest.with { $0.text = word } }
137+
for request in requests {
138+
try await update.requestStream.send(request)
137139
}
140+
try await update.requestStream.send(requests)
138141
try await update.requestStream.finish()
139142

140143
let numResponses = try await update.responseStream.map { _ in 1 }.reduce(0, +)
141144

142-
await assertThat(numResponses, .is(.equalTo(3)))
145+
await assertThat(numResponses, .is(.equalTo(6)))
143146
await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
144147
await assertThat(await update.status, .hasCode(.ok))
145148
}

Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,28 @@ class AsyncServerHandlerTests: GRPCTestCase {
230230
await responseStream.next().assertNil()
231231
}
232232

233+
func testResponseSequence() async throws {
234+
let handler = self.makeHandler { _, responseStreamWriter, _ in
235+
try await responseStreamWriter.send(contentsOf: ["1", "2", "3"])
236+
}
237+
defer {
238+
XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
239+
}
240+
241+
self.loop.execute {
242+
handler.receiveMetadata([:])
243+
handler.receiveEnd()
244+
}
245+
246+
let responseStream = self.recorder.responseSequence.makeAsyncIterator()
247+
await responseStream.next().assertMetadata { _ in }
248+
await responseStream.next().assertMessage()
249+
await responseStream.next().assertMessage()
250+
await responseStream.next().assertMessage()
251+
await responseStream.next().assertStatus { _, _ in }
252+
await responseStream.next().assertNil()
253+
}
254+
233255
func testThrowingDeserializer() async throws {
234256
let handler = AsyncServerHandler(
235257
context: self.makeCallHandlerContext(),

0 commit comments

Comments
 (0)