Skip to content

Commit 7e6d8fa

Browse files
authored
Add async version of QPS Benchmark Service (#1470)
Motivation: We want to provide an async/await version of the QPS Benchmark service, and be able to choose whether we want to use that or the ELF version when running the tests. Modifications: Added support for an async/await implementation of the Benchmark Service. This is currently not in use, but a follow-up CR will include changes to the Worker Service and other classes, and will enable choosing between the ELF and async/await implementations when running. Result: An async/await version of the Benchmark Service will be ready for use.
1 parent 6639e45 commit 7e6d8fa

File tree

5 files changed

+129
-7
lines changed

5 files changed

+129
-7
lines changed

Performance/QPSBenchmark/Package.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// swift-tools-version:5.4
1+
// swift-tools-version:5.6
22
/*
33
* Copyright 2020, gRPC Authors All rights reserved.
44
*
@@ -19,6 +19,7 @@ import PackageDescription
1919

2020
let package = Package(
2121
name: "QPSBenchmark",
22+
platforms: [.macOS(.v12)],
2223
products: [
2324
.executable(name: "QPSBenchmark", targets: ["QPSBenchmark"]),
2425
],

Performance/QPSBenchmark/Sources/QPSBenchmark/Model/benchmark_service.grpc.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ extension Grpc_Testing_BenchmarkServiceAsyncClientProtocol {
350350
public func streamingCall<RequestStream>(
351351
_ requests: RequestStream,
352352
callOptions: CallOptions? = nil
353-
) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_SimpleRequest {
353+
) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_SimpleRequest {
354354
return self.performAsyncBidirectionalStreamingCall(
355355
path: Grpc_Testing_BenchmarkServiceClientMetadata.Methods.streamingCall.path,
356356
requests: requests,
@@ -374,7 +374,7 @@ extension Grpc_Testing_BenchmarkServiceAsyncClientProtocol {
374374
public func streamingFromClient<RequestStream>(
375375
_ requests: RequestStream,
376376
callOptions: CallOptions? = nil
377-
) async throws -> Grpc_Testing_SimpleResponse where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_SimpleRequest {
377+
) async throws -> Grpc_Testing_SimpleResponse where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_SimpleRequest {
378378
return try await self.performAsyncClientStreamingCall(
379379
path: Grpc_Testing_BenchmarkServiceClientMetadata.Methods.streamingFromClient.path,
380380
requests: requests,
@@ -410,7 +410,7 @@ extension Grpc_Testing_BenchmarkServiceAsyncClientProtocol {
410410
public func streamingBothWays<RequestStream>(
411411
_ requests: RequestStream,
412412
callOptions: CallOptions? = nil
413-
) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_SimpleRequest {
413+
) -> GRPCAsyncResponseStream<Grpc_Testing_SimpleResponse> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_SimpleRequest {
414414
return self.performAsyncBidirectionalStreamingCall(
415415
path: Grpc_Testing_BenchmarkServiceClientMetadata.Methods.streamingBothWays.path,
416416
requests: requests,

Performance/QPSBenchmark/Sources/QPSBenchmark/Model/worker_service.grpc.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ extension Grpc_Testing_WorkerServiceAsyncClientProtocol {
302302
public func runServer<RequestStream>(
303303
_ requests: RequestStream,
304304
callOptions: CallOptions? = nil
305-
) -> GRPCAsyncResponseStream<Grpc_Testing_ServerStatus> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_ServerArgs {
305+
) -> GRPCAsyncResponseStream<Grpc_Testing_ServerStatus> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_ServerArgs {
306306
return self.performAsyncBidirectionalStreamingCall(
307307
path: Grpc_Testing_WorkerServiceClientMetadata.Methods.runServer.path,
308308
requests: requests,
@@ -326,7 +326,7 @@ extension Grpc_Testing_WorkerServiceAsyncClientProtocol {
326326
public func runClient<RequestStream>(
327327
_ requests: RequestStream,
328328
callOptions: CallOptions? = nil
329-
) -> GRPCAsyncResponseStream<Grpc_Testing_ClientStatus> where RequestStream: AsyncSequence, RequestStream.Element == Grpc_Testing_ClientArgs {
329+
) -> GRPCAsyncResponseStream<Grpc_Testing_ClientStatus> where RequestStream: AsyncSequence & Sendable, RequestStream.Element == Grpc_Testing_ClientArgs {
330330
return self.performAsyncBidirectionalStreamingCall(
331331
path: Grpc_Testing_WorkerServiceClientMetadata.Methods.runClient.path,
332332
requests: requests,
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2022, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
import GRPC
19+
import NIOCore
20+
21+
/// Implementation of asynchronous service for benchmarking.
22+
final class AsyncBenchmarkServiceImpl: Grpc_Testing_BenchmarkServiceAsyncProvider {
23+
let interceptors: Grpc_Testing_BenchmarkServiceServerInterceptorFactoryProtocol? = nil
24+
25+
/// One request followed by one response.
26+
/// The server returns the client payload as-is.
27+
func unaryCall(
28+
request: Grpc_Testing_SimpleRequest,
29+
context: GRPCAsyncServerCallContext
30+
) async throws -> Grpc_Testing_SimpleResponse {
31+
return try AsyncBenchmarkServiceImpl.processSimpleRPC(request: request)
32+
}
33+
34+
/// Repeated sequence of one request followed by one response.
35+
/// Should be called streaming ping-pong
36+
/// The server returns the client payload as-is on each response
37+
func streamingCall(
38+
requestStream: GRPCAsyncRequestStream<Grpc_Testing_SimpleRequest>,
39+
responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_SimpleResponse>,
40+
context: GRPCAsyncServerCallContext
41+
) async throws {
42+
for try await request in requestStream {
43+
let response = try AsyncBenchmarkServiceImpl.processSimpleRPC(request: request)
44+
try await responseStream.send(response)
45+
}
46+
}
47+
48+
/// Single-sided unbounded streaming from client to server
49+
/// The server returns the client payload as-is once the client does WritesDone
50+
func streamingFromClient(
51+
requestStream: GRPCAsyncRequestStream<Grpc_Testing_SimpleRequest>,
52+
context: GRPCAsyncServerCallContext
53+
) async throws -> Grpc_Testing_SimpleResponse {
54+
context.request.logger.warning("streamingFromClient not implemented yet")
55+
throw GRPCStatus(
56+
code: .unimplemented,
57+
message: "Not implemented"
58+
)
59+
}
60+
61+
/// Single-sided unbounded streaming from server to client
62+
/// The server repeatedly returns the client payload as-is
63+
func streamingFromServer(
64+
request: Grpc_Testing_SimpleRequest,
65+
responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_SimpleResponse>,
66+
context: GRPCAsyncServerCallContext
67+
) async throws {
68+
context.request.logger.warning("streamingFromServer not implemented yet")
69+
throw GRPCStatus(
70+
code: GRPCStatus.Code.unimplemented,
71+
message: "Not implemented"
72+
)
73+
}
74+
75+
/// Two-sided unbounded streaming between server to client
76+
/// Both sides send the content of their own choice to the other
77+
func streamingBothWays(
78+
requestStream: GRPCAsyncRequestStream<Grpc_Testing_SimpleRequest>,
79+
responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_SimpleResponse>,
80+
context: GRPCAsyncServerCallContext
81+
) async throws {
82+
context.request.logger.warning("streamingBothWays not implemented yet")
83+
throw GRPCStatus(
84+
code: GRPCStatus.Code.unimplemented,
85+
message: "Not implemented"
86+
)
87+
}
88+
89+
/// Make a payload for sending back to the client.
90+
private static func makePayload(
91+
type: Grpc_Testing_PayloadType,
92+
size: Int
93+
) throws -> Grpc_Testing_Payload {
94+
if type != .compressable {
95+
// Making a payload which is not compressable is hard - and not implemented in
96+
// other implementations too.
97+
throw GRPCStatus(code: .internalError, message: "Failed to make payload")
98+
}
99+
var payload = Grpc_Testing_Payload()
100+
payload.body = Data(count: size)
101+
payload.type = type
102+
return payload
103+
}
104+
105+
/// Process a simple RPC.
106+
/// - parameters:
107+
/// - request: The request from the client.
108+
/// - returns: A response to send back to the client.
109+
private static func processSimpleRPC(
110+
request: Grpc_Testing_SimpleRequest
111+
) throws -> Grpc_Testing_SimpleResponse {
112+
var response = Grpc_Testing_SimpleResponse()
113+
if request.responseSize > 0 {
114+
response.payload = try self.makePayload(
115+
type: request.responseType,
116+
size: Int(request.responseSize)
117+
)
118+
}
119+
return response
120+
}
121+
}

Performance/QPSBenchmark/Sources/QPSBenchmark/Runtime/AsyncServer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ final class AsyncQPSServer: QPSServer {
5151
let workerService = AsyncQPSServerImpl()
5252

5353
// Start the server.
54-
// TODO: Support TLS is requested.
54+
// TODO: Support TLS if requested.
5555
self.server = Server.insecure(group: self.eventLoopGroup)
5656
.withServiceProviders([workerService])
5757
.withLogger(self.logger)

0 commit comments

Comments
 (0)