Skip to content

Commit 04ef584

Browse files
Convert errors thrown from interceptor inbound or outbound stream (#3)
* Convert errors thrown from interceptor inbound or outbound stream * Introduce castOrConvertRPCError helper * Simplify unit tests * Use castOrConvert in another palce and add unit tests * Format and lint
1 parent a011e71 commit 04ef584

File tree

10 files changed

+256
-14
lines changed

10 files changed

+256
-14
lines changed

Sources/GRPCCore/Call/Client/Internal/ClientResponse+Convenience.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ extension ClientResponse {
7171
} catch let error as RPCError {
7272
// Known error type.
7373
self.accepted = .success(Contents(metadata: contents.metadata, error: error))
74+
} catch let error as any RPCErrorConvertible {
75+
self.accepted = .success(Contents(metadata: contents.metadata, error: RPCError(error)))
7476
} catch {
7577
// Unexpected, but should be handled nonetheless.
7678
self.accepted = .failure(RPCError(code: .unknown, message: String(describing: error)))

Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal enum ClientStreamExecutor {
2626
/// - attempt: The attempt number for the RPC that will be executed.
2727
/// - serializer: A request serializer.
2828
/// - deserializer: A response deserializer.
29-
/// - stream: The stream to excecute the RPC on.
29+
/// - stream: The stream to execute the RPC on.
3030
/// - Returns: A streamed response.
3131
@inlinable
3232
static func execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
@@ -95,7 +95,7 @@ internal enum ClientStreamExecutor {
9595
let result = await Result {
9696
try await stream.write(.metadata(request.metadata))
9797
try await request.producer(.map(into: stream) { .message(try serializer.serialize($0)) })
98-
}.castError(to: RPCError.self) { other in
98+
}.castOrConvertRPCError { other in
9999
RPCError(code: .unknown, message: "Write failed.", cause: other)
100100
}
101101

Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,12 @@ struct ServerRPCExecutor {
188188
) { request, context in
189189
try await handler(request, context)
190190
}
191-
}.castError(to: RPCError.self) { error in
192-
if let convertible = error as? (any RPCErrorConvertible) {
193-
return RPCError(convertible)
194-
} else {
195-
return RPCError(
196-
code: .unknown,
197-
message: "Service method threw an unknown error.",
198-
cause: error
199-
)
200-
}
191+
}.castOrConvertRPCError { error in
192+
RPCError(
193+
code: .unknown,
194+
message: "Service method threw an unknown error.",
195+
cause: error
196+
)
201197
}.flatMap { response in
202198
response.accepted
203199
}
@@ -213,7 +209,7 @@ struct ServerRPCExecutor {
213209
return try await contents.producer(
214210
.serializingToRPCResponsePart(into: outbound, with: serializer)
215211
)
216-
}.castError(to: RPCError.self) { error in
212+
}.castOrConvertRPCError { error in
217213
RPCError(code: .unknown, message: "", cause: error)
218214
}
219215

Sources/GRPCCore/Internal/Result+Catching.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,23 @@ extension Result {
4545
return (error as? NewError) ?? buildError(error)
4646
}
4747
}
48+
49+
/// Attempt to map or convert the error to an `RPCError`.
50+
///
51+
/// If the cast or conversion is not possible then the provided closure is used to create an error of the given type.
52+
///
53+
/// - Parameter buildError: A closure which constructs the desired error if conversion is not possible.
54+
@inlinable
55+
@available(gRPCSwift 2.0, *)
56+
func castOrConvertRPCError(
57+
or buildError: (any Error) -> RPCError
58+
) -> Result<Success, RPCError> {
59+
return self.castError(to: RPCError.self) { error in
60+
if let convertible = error as? any RPCErrorConvertible {
61+
return RPCError(convertible)
62+
} else {
63+
return buildError(error)
64+
}
65+
}
66+
}
4867
}

Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ struct ClientRPCExecutorTestHarness {
132132
try await withThrowingTaskGroup(of: Void.self) { group in
133133
group.addTask {
134134
try await self.serverTransport.listen { stream, context in
135-
try? await self.server.handle(stream: stream)
135+
do {
136+
try await self.server.handle(stream: stream)
137+
} catch {
138+
await stream.outbound.finish(throwing: error)
139+
}
136140
}
137141
}
138142

Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests.swift

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,4 +290,46 @@ final class ClientRPCExecutorTests: XCTestCase {
290290
}
291291
}
292292
}
293+
294+
func testInterceptorProducerErrorConversion() async throws {
295+
struct CustomError: RPCErrorConvertible, Error {
296+
var rpcErrorCode: RPCError.Code { .alreadyExists }
297+
var rpcErrorMessage: String { "foobar" }
298+
var rpcErrorMetadata: Metadata { ["error": "yes"] }
299+
}
300+
301+
let tester = ClientRPCExecutorTestHarness(
302+
server: .echo,
303+
interceptors: [.throwInProducer(CustomError())]
304+
)
305+
306+
try await tester.unary(request: ClientRequest(message: [])) { response in
307+
XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
308+
XCTAssertEqual(error.code, .alreadyExists)
309+
XCTAssertEqual(error.message, "foobar")
310+
XCTAssertEqual(error.metadata, ["error": "yes"])
311+
}
312+
}
313+
}
314+
315+
func testInterceptorBodyPartsErrorConversion() async throws {
316+
struct CustomError: RPCErrorConvertible, Error {
317+
var rpcErrorCode: RPCError.Code { .alreadyExists }
318+
var rpcErrorMessage: String { "foobar" }
319+
var rpcErrorMetadata: Metadata { ["error": "yes"] }
320+
}
321+
322+
let tester = ClientRPCExecutorTestHarness(
323+
server: .echo,
324+
interceptors: [.throwInBodyParts(CustomError())]
325+
)
326+
327+
try await tester.unary(request: ClientRequest(message: [])) { response in
328+
XCTAssertThrowsError(ofType: RPCError.self, try response.message) { error in
329+
XCTAssertEqual(error.code, .alreadyExists)
330+
XCTAssertEqual(error.message, "foobar")
331+
XCTAssertEqual(error.metadata, ["error": "yes"])
332+
}
333+
}
334+
}
293335
}

Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTests.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,4 +394,48 @@ final class ServerRPCExecutorTests: XCTestCase {
394394
XCTAssertEqual(parts, [.status(status, metadata)])
395395
}
396396
}
397+
398+
func testInterceptorProducerErrorConversion() async throws {
399+
struct CustomError: RPCErrorConvertible, Error {
400+
var rpcErrorCode: RPCError.Code { .alreadyExists }
401+
var rpcErrorMessage: String { "foobar" }
402+
var rpcErrorMetadata: Metadata { ["error": "yes"] }
403+
}
404+
405+
let harness = ServerRPCExecutorTestHarness(
406+
interceptors: [.throwInProducer(CustomError())]
407+
)
408+
try await harness.execute(handler: .echo) { inbound in
409+
try await inbound.write(.metadata(["foo": "bar"]))
410+
try await inbound.write(.message([0]))
411+
} consumer: { outbound in
412+
let parts = try await outbound.collect()
413+
let status = Status(code: .alreadyExists, message: "foobar")
414+
let metadata: Metadata = ["error": "yes"]
415+
XCTAssertEqual(parts, [.metadata(["foo": "bar"]), .message([0]), .status(status, metadata)])
416+
}
417+
}
418+
419+
func testInterceptorMessagesErrorConversion() async throws {
420+
struct CustomError: RPCErrorConvertible, Error {
421+
var rpcErrorCode: RPCError.Code { .alreadyExists }
422+
var rpcErrorMessage: String { "foobar" }
423+
var rpcErrorMetadata: Metadata { ["error": "yes"] }
424+
}
425+
426+
let harness = ServerRPCExecutorTestHarness(interceptors: [
427+
.throwInMessageSequence(CustomError())
428+
])
429+
try await harness.execute(handler: .echo) { inbound in
430+
try await inbound.write(.metadata(["foo": "bar"]))
431+
// the sequence throws instantly, this should not arrive
432+
try await inbound.write(.message([0]))
433+
await inbound.finish()
434+
} consumer: { outbound in
435+
let parts = try await outbound.collect()
436+
let status = Status(code: .alreadyExists, message: "foobar")
437+
let metadata: Metadata = ["error": "yes"]
438+
XCTAssertEqual(parts, [.metadata(["foo": "bar"]), .status(status, metadata)])
439+
}
440+
}
397441
}

Tests/GRPCCoreTests/Internal/Result+CatchingTests.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,39 @@ final class ResultCatchingTests: XCTestCase {
6363
XCTAssertEqual(error, RPCError(code: .invalidArgument, message: "fallback"))
6464
}
6565
}
66+
67+
func testCastOrConvertRPCErrorConvertible() {
68+
struct ConvertibleError: Error, RPCErrorConvertible {
69+
let rpcErrorCode: RPCError.Code = .unknown
70+
let rpcErrorMessage = "foo"
71+
}
72+
73+
let result = Result<Void, any Error>.failure(ConvertibleError())
74+
let typedFailure = result.castOrConvertRPCError { _ in
75+
XCTFail("buildError(_:) was called")
76+
return RPCError(code: .failedPrecondition, message: "shouldn't happen")
77+
}
78+
79+
switch typedFailure {
80+
case .success:
81+
XCTFail()
82+
case .failure(let error):
83+
XCTAssertEqual(error, RPCError(code: .unknown, message: "foo"))
84+
}
85+
}
86+
87+
func testCastOrConvertToErrorOfIncorrectType() async {
88+
struct WrongError: Error {}
89+
let result = Result<Void, any Error>.failure(WrongError())
90+
let typedFailure = result.castOrConvertRPCError { _ in
91+
return RPCError(code: .invalidArgument, message: "fallback")
92+
}
93+
94+
switch typedFailure {
95+
case .success:
96+
XCTFail()
97+
case .failure(let error):
98+
XCTAssertEqual(error, RPCError(code: .invalidArgument, message: "fallback"))
99+
}
100+
}
66101
}

Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ extension ClientInterceptor where Self == RejectAllClientInterceptor {
2626
return RejectAllClientInterceptor(throw: error)
2727
}
2828

29+
static func throwInBodyParts(_ error: any Error) -> Self {
30+
return RejectAllClientInterceptor(throwInBodyParts: error)
31+
}
32+
33+
static func throwInProducer(_ error: any Error) -> Self {
34+
return RejectAllClientInterceptor(throwInProducer: error)
35+
}
2936
}
3037

3138
@available(gRPCSwift 2.0, *)
@@ -43,6 +50,10 @@ struct RejectAllClientInterceptor: ClientInterceptor {
4350
case `throw`(any Error)
4451
/// Reject the RPC with a given error.
4552
case reject(RPCError)
53+
/// Throw an error in the body parts sequence.
54+
case throwInBodyParts(any Error)
55+
/// Throw an error in the message producer closure.
56+
case throwInProducer(any Error)
4657
}
4758

4859
let mode: Mode
@@ -55,6 +66,14 @@ struct RejectAllClientInterceptor: ClientInterceptor {
5566
self.mode = .reject(error)
5667
}
5768

69+
init(throwInBodyParts error: any Error) {
70+
self.mode = .throwInBodyParts(error)
71+
}
72+
73+
init(throwInProducer error: any Error) {
74+
self.mode = .throwInProducer(error)
75+
}
76+
5877
func intercept<Input: Sendable, Output: Sendable>(
5978
request: StreamingClientRequest<Input>,
6079
context: ClientContext,
@@ -68,6 +87,31 @@ struct RejectAllClientInterceptor: ClientInterceptor {
6887
throw error
6988
case .reject(let error):
7089
return StreamingClientResponse(error: error)
90+
case .throwInBodyParts(let error):
91+
var response = try await next(request, context)
92+
switch response.accepted {
93+
case .success(var success):
94+
let stream = AsyncThrowingStream<
95+
StreamingClientResponse<Output>.Contents.BodyPart, any Error
96+
>.makeStream()
97+
stream.continuation.finish(throwing: error)
98+
99+
success.bodyParts = RPCAsyncSequence(wrapping: stream.stream)
100+
response.accepted = .success(success)
101+
return response
102+
case .failure:
103+
return response
104+
}
105+
case .throwInProducer(let error):
106+
let wrappedProducer = request.producer
107+
108+
var request = request
109+
request.producer = { writer in
110+
try await wrappedProducer(writer)
111+
throw error
112+
}
113+
114+
return try await next(request, context)
71115
}
72116
}
73117
}

Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ extension ServerInterceptor where Self == RejectAllServerInterceptor {
2525
static func throwError(_ error: any Error) -> Self {
2626
RejectAllServerInterceptor(throw: error)
2727
}
28+
29+
static func throwInProducer(_ error: any Error) -> Self {
30+
RejectAllServerInterceptor(throwInProducer: error)
31+
}
32+
33+
static func throwInMessageSequence(_ error: any Error) -> Self {
34+
RejectAllServerInterceptor(throwInMessageSequence: error)
35+
}
2836
}
2937

3038
@available(gRPCSwift 2.0, *)
@@ -42,6 +50,16 @@ struct RejectAllServerInterceptor: ServerInterceptor {
4250
case `throw`(any Error)
4351
/// Reject the RPC with a given error.
4452
case reject(RPCError)
53+
/// Throw in the producer closure returned.
54+
case throwInProducer(any Error)
55+
/// Throw in the async sequence that stream inbound messages.
56+
case throwInMessageSequence(any Error)
57+
}
58+
59+
private enum TimeoutResult {
60+
case `throw`(any Error)
61+
case cancelled
62+
case result(Metadata)
4563
}
4664

4765
let mode: Mode
@@ -54,6 +72,14 @@ struct RejectAllServerInterceptor: ServerInterceptor {
5472
self.mode = .reject(error)
5573
}
5674

75+
init(throwInProducer error: any Error) {
76+
self.mode = .throwInProducer(error)
77+
}
78+
79+
init(throwInMessageSequence error: any Error) {
80+
self.mode = .throwInMessageSequence(error)
81+
}
82+
5783
func intercept<Input: Sendable, Output: Sendable>(
5884
request: StreamingServerRequest<Input>,
5985
context: ServerContext,
@@ -67,6 +93,36 @@ struct RejectAllServerInterceptor: ServerInterceptor {
6793
throw error
6894
case .reject(let error):
6995
return StreamingServerResponse(error: error)
96+
case .throwInProducer(let error):
97+
var response = try await next(request, context)
98+
switch response.accepted {
99+
case .success(var success):
100+
let wrappedProducer = success.producer
101+
success.producer = { writer in
102+
try await withThrowingTaskGroup(of: Metadata.self) { group in
103+
group.addTask {
104+
try await wrappedProducer(writer)
105+
}
106+
107+
group.cancelAll()
108+
_ = try await group.next()!
109+
throw error
110+
}
111+
}
112+
113+
response.accepted = .success(success)
114+
return response
115+
case .failure:
116+
return response
117+
}
118+
case .throwInMessageSequence(let error):
119+
let stream = AsyncThrowingStream<Input, any Error>.makeStream()
120+
stream.continuation.finish(throwing: error)
121+
122+
var request = request
123+
request.messages = RPCAsyncSequence(wrapping: stream.stream)
124+
125+
return try await next(request, context)
70126
}
71127
}
72128
}

0 commit comments

Comments
 (0)