Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Sources/GRPCCore/Call/Client/ClientResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,13 @@ extension StreamingClientResponse {
return RPCAsyncSequence.throwing(error)
}
}

/// Returns the body parts (i.e. `messages` and `trailingMetadata`) returned from the server.
///
/// For rejected RPCs (in other words, where ``accepted`` is `failure`), this method throws an `RPCError`.
public var bodyParts: RPCAsyncSequence<Contents.BodyPart, any Error> {
get throws {
try self.accepted.get().bodyParts
}
}
}
48 changes: 47 additions & 1 deletion Tests/GRPCCoreTests/Call/Client/ClientResponseTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ final class ClientResponseTests: XCTestCase {
XCTAssertEqual(response.trailingMetadata, ["bar": "baz"])
}

func testAcceptedStreamResponseConvenienceMethods() async throws {
func testAcceptedStreamResponseConvenienceMethods_Messages() async throws {
let response = StreamingClientResponse(
of: String.self,
metadata: ["foo": "bar"],
Expand All @@ -73,6 +73,29 @@ final class ClientResponseTests: XCTestCase {
XCTAssertEqual(messages, ["foo", "bar", "baz"])
}

func testAcceptedStreamResponseConvenienceMethods_BodyParts() async throws {
let response = StreamingClientResponse(
of: String.self,
metadata: ["foo": "bar"],
bodyParts: RPCAsyncSequence(
wrapping: AsyncThrowingStream {
$0.yield(.message("foo"))
$0.yield(.message("bar"))
$0.yield(.message("baz"))
$0.yield(.trailingMetadata(["baz": "baz"]))
$0.finish()
}
)
)

XCTAssertEqual(response.metadata, ["foo": "bar"])
let bodyParts = try await response.bodyParts.collect()
XCTAssertEqual(
bodyParts,
[.message("foo"), .message("bar"), .message("baz"), .trailingMetadata(["baz": "baz"])]
)
}

func testRejectedStreamResponseConvenienceMethods() async throws {
let error = RPCError(code: .aborted, message: "error message", metadata: ["bar": "baz"])
let response = StreamingClientResponse(of: String.self, error: error)
Expand All @@ -83,6 +106,11 @@ final class ClientResponseTests: XCTestCase {
} errorHandler: {
XCTAssertEqual($0, error)
}
await XCTAssertThrowsRPCErrorAsync {
try response.bodyParts
} errorHandler: {
XCTAssertEqual($0, error)
}
}

func testStreamToSingleConversionForValidStream() async throws {
Expand Down Expand Up @@ -182,3 +210,21 @@ final class ClientResponseTests: XCTestCase {
}
}
}

extension StreamingClientResponse.Contents.BodyPart: Equatable where Message: Equatable {
static func == (
lhs: StreamingClientResponse.Contents.BodyPart,
rhs: StreamingClientResponse.Contents.BodyPart
) -> Bool {
switch (lhs, rhs) {
case (.message(let lhsMessage), .message(let rhsMessage)):
return lhsMessage == rhsMessage

case (.trailingMetadata(let lhsMetadata), .trailingMetadata(let rhsMetadata)):
return lhsMetadata == rhsMetadata

default:
return false
}
}
}
Loading