Skip to content

Commit 62653ba

Browse files
QPS Benchmark Service implementation (#1828)
Motivation: The 2 workers used in QPS testing are a Benchmark service client and server respectivelly, so we need to implement the Benchmark Service. Modifications: Implemented the 'BenchmarkService' struct that defines the service protocol methods, based on their documentation. Result: We will be able to proceed with the Wrorker Service implementation.
1 parent 3c65b40 commit 62653ba

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Atomics
18+
import GRPCCore
19+
20+
import struct Foundation.Data
21+
22+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
23+
struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
24+
/// Used to check if the server can be streaming responses.
25+
private let working = ManagedAtomic<Bool>(true)
26+
27+
/// One request followed by one response.
28+
/// The server returns a client payload with the size requested by the client.
29+
func unaryCall(
30+
request: GRPCCore.ServerRequest.Single<Grpc_Testing_BenchmarkService.Method.UnaryCall.Input>
31+
) async throws
32+
-> GRPCCore.ServerResponse.Single<Grpc_Testing_BenchmarkService.Method.UnaryCall.Output>
33+
{
34+
// Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent
35+
// if the request is successful.
36+
if request.message.responseStatus.isInitialized {
37+
try self.checkOkStatus(request.message.responseStatus)
38+
}
39+
40+
return ServerResponse.Single(
41+
message: Grpc_Testing_BenchmarkService.Method.UnaryCall.Output.with {
42+
$0.payload = Grpc_Testing_Payload.with {
43+
$0.body = Data(count: Int(request.message.responseSize))
44+
}
45+
}
46+
)
47+
}
48+
49+
/// Repeated sequence of one request followed by one response.
50+
/// The server returns a payload with the size requested by the client for each received message.
51+
func streamingCall(
52+
request: GRPCCore.ServerRequest.Stream<Grpc_Testing_BenchmarkService.Method.StreamingCall.Input>
53+
) async throws
54+
-> GRPCCore.ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingCall.Output>
55+
{
56+
return ServerResponse.Stream { writer in
57+
for try await message in request.messages {
58+
if message.responseStatus.isInitialized {
59+
try self.checkOkStatus(message.responseStatus)
60+
}
61+
try await writer.write(
62+
Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
63+
$0.payload = Grpc_Testing_Payload.with {
64+
$0.body = Data(count: Int(message.responseSize))
65+
}
66+
}
67+
)
68+
}
69+
return [:]
70+
}
71+
}
72+
73+
/// Single-sided unbounded streaming from client to server.
74+
/// The server returns a payload with the size requested by the client once the client does WritesDone.
75+
func streamingFromClient(
76+
request: ServerRequest.Stream<Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Input>
77+
) async throws
78+
-> ServerResponse.Single<Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output>
79+
{
80+
var responseSize = 0
81+
for try await message in request.messages {
82+
if message.responseStatus.isInitialized {
83+
try self.checkOkStatus(message.responseStatus)
84+
}
85+
responseSize = Int(message.responseSize)
86+
}
87+
88+
return ServerResponse.Single(
89+
message: Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output.with {
90+
$0.payload = Grpc_Testing_Payload.with {
91+
$0.body = Data(count: responseSize)
92+
}
93+
}
94+
)
95+
}
96+
97+
/// Single-sided unbounded streaming from server to client.
98+
/// The server repeatedly returns a payload with the size requested by the client.
99+
func streamingFromServer(
100+
request: ServerRequest.Single<Grpc_Testing_BenchmarkService.Method.StreamingFromServer.Input>
101+
) async throws
102+
-> ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingFromServer.Output>
103+
{
104+
if request.message.responseStatus.isInitialized {
105+
try self.checkOkStatus(request.message.responseStatus)
106+
}
107+
let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
108+
$0.payload = Grpc_Testing_Payload.with {
109+
$0.body = Data(count: Int(request.message.responseSize))
110+
}
111+
}
112+
return ServerResponse.Stream { writer in
113+
while working.load(ordering: .relaxed) {
114+
try await writer.write(response)
115+
}
116+
return [:]
117+
}
118+
}
119+
120+
/// Two-sided unbounded streaming between server to client.
121+
/// Both sides send the content of their own choice to the other.
122+
func streamingBothWays(
123+
request: GRPCCore.ServerRequest.Stream<
124+
Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Input
125+
>
126+
) async throws
127+
-> ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Output>
128+
{
129+
// The 100 size is used by the other implementations as well.
130+
// We are using the same canned response size for all responses
131+
// as it is allowed by the spec.
132+
let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
133+
$0.payload = Grpc_Testing_Payload.with {
134+
$0.body = Data(count: 100)
135+
}
136+
}
137+
138+
// Marks if the inbound streaming is ongoing or finished.
139+
let inboundStreaming = ManagedAtomic<Bool>(true)
140+
141+
return ServerResponse.Stream { writer in
142+
try await withThrowingTaskGroup(of: Void.self) { group in
143+
group.addTask {
144+
for try await message in request.messages {
145+
if message.responseStatus.isInitialized {
146+
try self.checkOkStatus(message.responseStatus)
147+
}
148+
}
149+
inboundStreaming.store(false, ordering: .relaxed)
150+
}
151+
group.addTask {
152+
while inboundStreaming.load(ordering: .relaxed)
153+
&& self.working.load(ordering: .acquiring)
154+
{
155+
try await writer.write(response)
156+
}
157+
}
158+
try await group.next()
159+
group.cancelAll()
160+
return [:]
161+
}
162+
}
163+
}
164+
}
165+
166+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
167+
extension BenchmarkService {
168+
private func checkOkStatus(_ responseStatus: Grpc_Testing_EchoStatus) throws {
169+
guard let code = Status.Code(rawValue: Int(responseStatus.code)) else {
170+
throw RPCError(code: .invalidArgument, message: "The response status code is invalid.")
171+
}
172+
if let code = RPCError.Code(code) {
173+
throw RPCError(code: code, message: responseStatus.message)
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)