Skip to content

Commit bdb7458

Browse files
BenchmarkClient RPCs implementation (#1861)
Motivation: When the WorkerService holds and monitors BenchmarkClients, they need to make the requests to the server, as configured in the config input the WorkerService receives from the Test Driver. Modifications: - implemented a helper function that computes the latency and extracts the error code for a RPC - implemented the body of the makrRPC function that makes one of the 5 possible RPCs Result: The BenchmarkClient implementation for performance testing will be completed.
1 parent a637cf3 commit bdb7458

File tree

3 files changed

+200
-15
lines changed

3 files changed

+200
-15
lines changed

Sources/performance-worker/BenchmarkClient.swift

Lines changed: 149 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,23 @@ import NIOConcurrencyHelpers
2222
struct BenchmarkClient {
2323
private var client: GRPCClient
2424
private var rpcNumber: Int32
25-
private var rpcType: Grpc_Testing_RpcType
25+
private var rpcType: RPCType
26+
private var messagesPerStream: Int32
27+
private var protoParams: Grpc_Testing_SimpleProtoParams
2628
private let rpcStats: NIOLockedValueBox<RPCStats>
2729

2830
init(
2931
client: GRPCClient,
3032
rpcNumber: Int32,
31-
rpcType: Grpc_Testing_RpcType,
33+
rpcType: RPCType,
34+
messagesPerStream: Int32,
35+
protoParams: Grpc_Testing_SimpleProtoParams,
3236
histogramParams: Grpc_Testing_HistogramParams?
3337
) {
3438
self.client = client
3539
self.rpcNumber = rpcNumber
40+
self.messagesPerStream = messagesPerStream
41+
self.protoParams = protoParams
3642
self.rpcType = rpcType
3743

3844
let histogram: RPCStats.LatencyHistogram
@@ -48,6 +54,14 @@ struct BenchmarkClient {
4854
self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
4955
}
5056

57+
enum RPCType {
58+
case unary
59+
case streaming
60+
case streamingFromClient
61+
case streamingFromServer
62+
case streamingBothWays
63+
}
64+
5165
internal var currentStats: RPCStats {
5266
return self.rpcStats.withLockedValue { stats in
5367
return stats
@@ -64,7 +78,9 @@ struct BenchmarkClient {
6478
try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
6579
for _ in 0 ..< self.rpcNumber {
6680
rpcsGroup.addTask {
67-
let (latency, errorCode) = self.makeRPC(client: benchmarkClient, rpcType: self.rpcType)
81+
let (latency, errorCode) = try await self.makeRPC(
82+
benchmarkClient: benchmarkClient
83+
)
6884
self.rpcStats.withLockedValue {
6985
$0.latencyHistogram.record(latency)
7086
if let errorCode = errorCode {
@@ -80,19 +96,138 @@ struct BenchmarkClient {
8096
}
8197
}
8298

99+
private func timeIt<R>(
100+
_ body: () async throws -> R
101+
) async rethrows -> (R, nanoseconds: Double) {
102+
let startTime = DispatchTime.now().uptimeNanoseconds
103+
let result = try await body()
104+
let endTime = DispatchTime.now().uptimeNanoseconds
105+
return (result, nanoseconds: Double(endTime - startTime))
106+
}
107+
83108
// The result is the number of nanoseconds for processing the RPC.
84109
private func makeRPC(
85-
client: Grpc_Testing_BenchmarkServiceClient,
86-
rpcType: Grpc_Testing_RpcType
87-
) -> (latency: Double, errorCode: RPCError.Code?) {
88-
switch rpcType {
89-
case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays,
90-
.UNRECOGNIZED:
91-
let startTime = DispatchTime.now().uptimeNanoseconds
92-
let endTime = DispatchTime.now().uptimeNanoseconds
93-
return (
94-
latency: Double(endTime - startTime), errorCode: RPCError.Code(.unimplemented)
95-
)
110+
benchmarkClient: Grpc_Testing_BenchmarkServiceClient
111+
) async throws -> (latency: Double, errorCode: RPCError.Code?) {
112+
let message = Grpc_Testing_SimpleRequest.with {
113+
$0.responseSize = self.protoParams.respSize
114+
$0.payload = Grpc_Testing_Payload.with {
115+
$0.body = Data(count: Int(self.protoParams.reqSize))
116+
}
117+
}
118+
119+
switch self.rpcType {
120+
case .unary:
121+
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
122+
do {
123+
try await benchmarkClient.unaryCall(
124+
request: ClientRequest.Single(message: message)
125+
) { response in
126+
_ = try response.message
127+
}
128+
return nil
129+
} catch let error as RPCError {
130+
return error.code
131+
} catch {
132+
return .unknown
133+
}
134+
}
135+
return (latency: nanoseconds, errorCode)
136+
137+
// Repeated sequence of one request followed by one response.
138+
// It is a ping-pong of messages between the client and the server.
139+
case .streaming:
140+
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
141+
do {
142+
let ids = AsyncStream.makeStream(of: Int.self)
143+
let streamingRequest = ClientRequest.Stream { writer in
144+
for try await id in ids.stream {
145+
if id <= self.messagesPerStream {
146+
try await writer.write(message)
147+
} else {
148+
return
149+
}
150+
}
151+
}
152+
153+
ids.continuation.yield(1)
154+
155+
try await benchmarkClient.streamingCall(request: streamingRequest) { response in
156+
var id = 1
157+
for try await _ in response.messages {
158+
id += 1
159+
ids.continuation.yield(id)
160+
}
161+
}
162+
return nil
163+
} catch let error as RPCError {
164+
return error.code
165+
} catch {
166+
return .unknown
167+
}
168+
}
169+
return (latency: nanoseconds, errorCode)
170+
171+
case .streamingFromClient:
172+
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
173+
do {
174+
let streamingRequest = ClientRequest.Stream { writer in
175+
for _ in 1 ... self.messagesPerStream {
176+
try await writer.write(message)
177+
}
178+
}
179+
180+
try await benchmarkClient.streamingFromClient(
181+
request: streamingRequest
182+
) { response in
183+
_ = try response.message
184+
}
185+
return nil
186+
} catch let error as RPCError {
187+
return error.code
188+
} catch {
189+
return .unknown
190+
}
191+
}
192+
return (latency: nanoseconds, errorCode)
193+
194+
case .streamingFromServer:
195+
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
196+
do {
197+
try await benchmarkClient.streamingFromServer(
198+
request: ClientRequest.Single(message: message)
199+
) { response in
200+
for try await _ in response.messages {}
201+
}
202+
return nil
203+
} catch let error as RPCError {
204+
return error.code
205+
} catch {
206+
return .unknown
207+
}
208+
}
209+
return (latency: nanoseconds, errorCode)
210+
211+
case .streamingBothWays:
212+
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
213+
do {
214+
let streamingRequest = ClientRequest.Stream { writer in
215+
for _ in 1 ... self.messagesPerStream {
216+
try await writer.write(message)
217+
}
218+
}
219+
220+
try await benchmarkClient.streamingBothWays(request: streamingRequest) { response in
221+
for try await _ in response.messages {}
222+
}
223+
return nil
224+
} catch let error as RPCError {
225+
return error.code
226+
} catch {
227+
return .unknown
228+
}
229+
}
230+
return (latency: nanoseconds, errorCode)
96231
}
97232
}
98233

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if swift(<5.9)
18+
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
19+
extension AsyncStream {
20+
@inlinable
21+
static func makeStream(
22+
of elementType: Element.Type = Element.self,
23+
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded
24+
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
25+
var continuation: AsyncStream<Element>.Continuation!
26+
let stream = AsyncStream(Element.self, bufferingPolicy: limit) {
27+
continuation = $0
28+
}
29+
return (stream, continuation)
30+
}
31+
}
32+
#endif

Sources/performance-worker/WorkerService.swift

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,14 +325,32 @@ extension WorkerService {
325325
}
326326

327327
private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] {
328+
let rpcType: BenchmarkClient.RPCType
329+
switch config.rpcType {
330+
case .unary:
331+
rpcType = .unary
332+
case .streaming:
333+
rpcType = .streaming
334+
case .streamingFromClient:
335+
rpcType = .streamingFromClient
336+
case .streamingFromServer:
337+
rpcType = .streamingFromServer
338+
case .streamingBothWays:
339+
rpcType = .streamingBothWays
340+
case .UNRECOGNIZED:
341+
throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.")
342+
}
343+
328344
var clients = [BenchmarkClient]()
329345
for _ in 0 ..< config.clientChannels {
330346
let grpcClient = self.makeGRPCClient()
331347
clients.append(
332348
BenchmarkClient(
333349
client: grpcClient,
334350
rpcNumber: config.outstandingRpcsPerChannel,
335-
rpcType: config.rpcType,
351+
rpcType: rpcType,
352+
messagesPerStream: config.messagesPerStream,
353+
protoParams: config.payloadConfig.simpleParams,
336354
histogramParams: config.histogramParams
337355
)
338356
)

0 commit comments

Comments
 (0)