Skip to content

Commit babcff1

Browse files
authored
Provide an error when cancelling async writer (#1456)
Motivation: The `AsyncWriter` fails any pending writes with a `CancellationError` if the writer is cancelled. This can be misleading if the writer is cancelled because a connection could not be established, for example. Modifications: - Pass an error through `AsyncWriter.cancel` Result: Pending writes which are failed because the writer has been cancelled have more relevant errors.
1 parent ac0a76d commit babcff1

File tree

6 files changed

+40
-21
lines changed

6 files changed

+40
-21
lines changed

Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
198198

199199
/// As ``cancel()`` but executed asynchronously.
200200
@usableFromInline
201-
internal nonisolated func cancelAsynchronously() {
201+
internal nonisolated func cancelAsynchronously(withError error: Error) {
202202
Task {
203-
await self.cancel()
203+
await self.cancel(withError: error)
204204
}
205205
}
206206

@@ -209,7 +209,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
209209
/// Any pending writes will be dropped and their continuations will be resumed with
210210
/// a `CancellationError`. Any writes after cancellation has completed will also fail.
211211
@usableFromInline
212-
internal func cancel() {
212+
internal func cancel(withError error: Error) {
213213
// If there's an end we should fail that last.
214214
let pendingEnd: PendingEnd?
215215

@@ -228,13 +228,11 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
228228
pendingEnd = nil
229229
}
230230

231-
let cancellationError = CancellationError()
232-
233231
while let pending = self._pendingElements.popFirst() {
234-
pending.continuation.resume(throwing: cancellationError)
232+
pending.continuation.resume(throwing: error)
235233
}
236234

237-
pendingEnd?.continuation.resume(throwing: cancellationError)
235+
pendingEnd?.continuation.resume(throwing: error)
238236
}
239237

240238
/// Write an `element`.
@@ -263,7 +261,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
263261
throw GRPCAsyncWriterError.tooManyPendingWrites
264262
}
265263
} onCancel: {
266-
self.cancelAsynchronously()
264+
self.cancelAsynchronously(withError: CancellationError())
267265
}
268266
}
269267

@@ -283,7 +281,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
283281
}
284282
}
285283
} onCancel: {
286-
self.cancelAsynchronously()
284+
self.cancelAsynchronously(withError: CancellationError())
287285
}
288286
}
289287
}

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
9292
onError: { error in
9393
asyncCall.responseParts.handleError(error)
9494
asyncCall.responseSource.finish(throwing: error)
95-
asyncCall.requestStream.asyncWriter.cancelAsynchronously()
95+
asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
9696
},
9797
onResponsePart: AsyncCall.makeResponsePartHandler(
9898
responseParts: asyncCall.responseParts,
@@ -133,7 +133,7 @@ internal enum AsyncCall {
133133
responseSource.finish(throwing: status)
134134
}
135135

136-
requestStream?.asyncWriter.cancelAsynchronously()
136+
requestStream?.asyncWriter.cancelAsynchronously(withError: status)
137137
}
138138
}
139139
}
@@ -152,8 +152,8 @@ internal enum AsyncCall {
152152
switch responsePart {
153153
case .metadata, .message:
154154
()
155-
case .end:
156-
requestStream?.asyncWriter.cancelAsynchronously()
155+
case let .end(status, _):
156+
requestStream?.asyncWriter.cancelAsynchronously(withError: status)
157157
}
158158
}
159159
}

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public struct GRPCAsyncClientStreamingCall<Request: Sendable, Response: Sendable
9090
},
9191
onError: { error in
9292
asyncCall.responseParts.handleError(error)
93-
asyncCall.requestStream.asyncWriter.cancelAsynchronously()
93+
asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
9494
},
9595
onResponsePart: AsyncCall.makeResponsePartHandler(
9696
responseParts: asyncCall.responseParts,

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ internal final class AsyncServerHandler<
487487
try await responseStreamWriter.finish(.ok)
488488
} catch {
489489
// Drop pending writes as we're on the error path.
490-
await responseStreamWriter.cancel()
490+
await responseStreamWriter.cancel(withError: error)
491491

492492
if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk {
493493
throw GRPCStatus(code: .unknown, message: "Handler threw error with status code 'ok'.")
@@ -831,7 +831,7 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
831831
// written. This should reduce how long the user handler runs for as it can no longer do
832832
// anything useful.
833833
self.requestSource.finish(throwing: CancellationError())
834-
self.responseWriter.cancelAsynchronously()
834+
self.responseWriter.cancelAsynchronously(withError: CancellationError())
835835
self.task.cancel()
836836
}
837837
}

Tests/GRPCTests/AsyncAwaitSupport/AsyncClientTests.swift

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ final class AsyncClientCancellationTests: GRPCTestCase {
6464
return try self.makeClient(port: self.server.channel.localAddress!.port!)
6565
}
6666

67-
private func makeClient(port: Int) throws -> Echo_EchoAsyncClient {
67+
private func makeClient(
68+
port: Int,
69+
configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
70+
) throws -> Echo_EchoAsyncClient {
6871
precondition(self.pool == nil)
6972

7073
self.pool = try GRPCChannelPool.with(
@@ -73,6 +76,7 @@ final class AsyncClientCancellationTests: GRPCTestCase {
7376
eventLoopGroup: self.group
7477
) {
7578
$0.backgroundActivityLogger = self.clientLogger
79+
configure(&$0)
7680
}
7781

7882
return Echo_EchoAsyncClient(channel: self.pool)
@@ -394,6 +398,23 @@ final class AsyncClientCancellationTests: GRPCTestCase {
394398
return .bidirectionalStreaming(echo.makeUpdateCall())
395399
}
396400
}
401+
402+
func testConnectionFailureCancelsRequestStreamWithError() async throws {
403+
let echo = try self.makeClient(port: 0) {
404+
// Configure a short wait time; we will not start a server so fail quickly.
405+
$0.connectionPool.maxWaitTime = .milliseconds(10)
406+
}
407+
408+
let update = echo.makeUpdateCall()
409+
await XCTAssertThrowsError(try await update.requestStream.send(.init())) { error in
410+
XCTAssertFalse(error is CancellationError)
411+
}
412+
413+
let collect = echo.makeCollectCall()
414+
await XCTAssertThrowsError(try await collect.requestStream.send(.init())) { error in
415+
XCTAssertFalse(error is CancellationError)
416+
}
417+
}
397418
}
398419

399420
#endif // compiler(>=5.6)

Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ internal class AsyncWriterTests: GRPCTestCase {
170170

171171
async let pendingWrite: Void = writer.write("foo")
172172

173-
await writer.cancel()
173+
await writer.cancel(withError: CancellationError())
174174

175175
do {
176176
try await pendingWrite
@@ -202,7 +202,7 @@ internal class AsyncWriterTests: GRPCTestCase {
202202

203203
async let pendingWrite: Void = writer.finish(42)
204204

205-
await writer.cancel()
205+
await writer.cancel(withError: CancellationError())
206206

207207
do {
208208
try await pendingWrite
@@ -229,13 +229,13 @@ internal class AsyncWriterTests: GRPCTestCase {
229229
let delegate = CollectingDelegate<String, Int>()
230230
let writer = AsyncWriter(delegate: delegate)
231231

232-
await writer.cancel()
232+
await writer.cancel(withError: CancellationError())
233233
await XCTAssertThrowsError(try await writer.write("1")) { error in
234234
XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
235235
}
236236

237237
// Fine, no need to throw. Nothing should change.
238-
await writer.cancel()
238+
await writer.cancel(withError: CancellationError())
239239
await XCTAssertThrowsError(try await writer.write("2")) { error in
240240
XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished)
241241
}

0 commit comments

Comments
 (0)