Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ final class BenchmarkClient: Sendable {
}

internal func run() async throws {
let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(wrapping: self.client)
let benchmarkClient = Grpc_Testing_BenchmarkService_Client(wrapping: self.client)
return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
// Start the client.
clientGroup.addTask {
Expand Down Expand Up @@ -148,10 +148,10 @@ final class BenchmarkClient: Sendable {
return (result, nanoseconds: Double(endTime - startTime))
}

private func unary(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
private func unary(benchmark: Grpc_Testing_BenchmarkService_Client) async {
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
do {
try await benchmark.unaryCall(request: ClientRequest.Single(message: self.message)) {
try await benchmark.unaryCall(request: ClientRequest(message: self.message)) {
_ = try $0.message
}
return nil
Expand All @@ -165,7 +165,7 @@ final class BenchmarkClient: Sendable {
self.record(latencyNanos: nanoseconds, errorCode: errorCode)
}

private func streaming(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
private func streaming(benchmark: Grpc_Testing_BenchmarkService_Client) async {
// Streaming RPCs ping-pong messages back and forth. To achieve this the response message
// stream is sent to the request closure, and the request closure indicates the outcome back
// to the response handler to keep the RPC alive for the appropriate amount of time.
Expand All @@ -174,7 +174,7 @@ final class BenchmarkClient: Sendable {
of: RPCAsyncSequence<Grpc_Testing_SimpleResponse, any Error>.self
)

let request = ClientRequest.Stream(of: Grpc_Testing_SimpleRequest.self) { writer in
let request = StreamingClientRequest(of: Grpc_Testing_SimpleRequest.self) { writer in
defer { status.continuation.finish() }

// The time at which the last message was sent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
/// One request followed by one response.
/// The server returns a client payload with the size requested by the client.
func unaryCall(
request: ServerRequest.Single<Grpc_Testing_SimpleRequest>,
request: ServerRequest<Grpc_Testing_SimpleRequest>,
context: ServerContext
) async throws -> ServerResponse.Single<Grpc_Testing_SimpleResponse> {
) async throws -> ServerResponse<Grpc_Testing_SimpleResponse> {
// Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent
// if the request is successful.
if request.message.responseStatus.isInitialized {
try self.checkOkStatus(request.message.responseStatus)
}

return ServerResponse.Single(
return ServerResponse(
message: .with {
$0.payload = Grpc_Testing_Payload.with {
$0.body = Data(count: Int(request.message.responseSize))
Expand All @@ -47,10 +47,10 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
/// Repeated sequence of one request followed by one response.
/// The server returns a payload with the size requested by the client for each received message.
func streamingCall(
request: ServerRequest.Stream<Grpc_Testing_SimpleRequest>,
request: StreamingServerRequest<Grpc_Testing_SimpleRequest>,
context: ServerContext
) async throws -> ServerResponse.Stream<Grpc_Testing_SimpleResponse> {
return ServerResponse.Stream { writer in
) async throws -> StreamingServerResponse<Grpc_Testing_SimpleResponse> {
return StreamingServerResponse { writer in
for try await message in request.messages {
if message.responseStatus.isInitialized {
try self.checkOkStatus(message.responseStatus)
Expand All @@ -72,9 +72,9 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
/// Single-sided unbounded streaming from client to server.
/// The server returns a payload with the size requested by the client once the client does WritesDone.
func streamingFromClient(
request: ServerRequest.Stream<Grpc_Testing_SimpleRequest>,
request: StreamingServerRequest<Grpc_Testing_SimpleRequest>,
context: ServerContext
) async throws -> ServerResponse.Single<Grpc_Testing_SimpleResponse> {
) async throws -> ServerResponse<Grpc_Testing_SimpleResponse> {
var responseSize = 0
for try await message in request.messages {
if message.responseStatus.isInitialized {
Expand All @@ -83,7 +83,7 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
responseSize = Int(message.responseSize)
}

return ServerResponse.Single(
return ServerResponse(
message: .with {
$0.payload = .with {
$0.body = Data(count: responseSize)
Expand All @@ -95,9 +95,9 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
/// Single-sided unbounded streaming from server to client.
/// The server repeatedly returns a payload with the size requested by the client.
func streamingFromServer(
request: ServerRequest.Single<Grpc_Testing_SimpleRequest>,
request: ServerRequest<Grpc_Testing_SimpleRequest>,
context: ServerContext
) async throws -> ServerResponse.Stream<Grpc_Testing_SimpleResponse> {
) async throws -> StreamingServerResponse<Grpc_Testing_SimpleResponse> {
if request.message.responseStatus.isInitialized {
try self.checkOkStatus(request.message.responseStatus)
}
Expand All @@ -108,7 +108,7 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
}
}

return ServerResponse.Stream { writer in
return StreamingServerResponse { writer in
while self.working.load(ordering: .relaxed) {
try await writer.write(response)
}
Expand All @@ -119,9 +119,9 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
/// Two-sided unbounded streaming between server to client.
/// Both sides send the content of their own choice to the other.
func streamingBothWays(
request: ServerRequest.Stream<Grpc_Testing_SimpleRequest>,
request: StreamingServerRequest<Grpc_Testing_SimpleRequest>,
context: ServerContext
) async throws -> ServerResponse.Stream<Grpc_Testing_SimpleResponse> {
) async throws -> StreamingServerResponse<Grpc_Testing_SimpleResponse> {
// The 100 size is used by the other implementations as well.
// We are using the same canned response size for all responses
// as it is allowed by the spec.
Expand Down Expand Up @@ -150,7 +150,7 @@ final class BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
// Marks if the inbound streaming is ongoing or finished.
let inbound = InboundStreamingSignal()

return ServerResponse.Stream { writer in
return StreamingServerResponse { writer in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
for try await message in request.messages {
Expand Down
Loading
Loading