Skip to content

Commit fe75fb1

Browse files
authored
Basic client RPC executor (#1693)
Motivation: Clients can execute RPCs in a few different ways: straight up as a single attempt, with retries, or with hedging. This is done by the `ClientRPCExecutor`, the bones of which are added in this PR. At a high level, the executor takes a transport, request, serializer, deserializer and response handler and executes the request against a transport and executes the response handler if a response is received or synthesized locally. The executor only returns once the response has been handled. Modifications: - Add the `ClientRPCExecutor` and the one-shot implementation (hedging and retries will follow later) - The `ClientRPCExecutor` uses a `ClientStreamExecutor` which deals in serialized streams - Add a testing harness, which includes type erased transports (and a basic in process transport for testing) and different server behaviours. Result: Can execute one-shot requests.
1 parent c841cbb commit fe75fb1

21 files changed

+1881
-80
lines changed

Package.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ let packageDependencies: [Package.Dependency] = [
5050
url: "https://github.com/apple/swift-collections.git",
5151
from: "1.0.5"
5252
),
53+
.package(
54+
url: "https://github.com/apple/swift-atomics.git",
55+
from: "1.2.0"
56+
),
5357
.package(
5458
url: "https://github.com/apple/swift-protobuf.git",
5559
from: "1.20.2"
@@ -125,6 +129,7 @@ extension Target.Dependency {
125129
package: "swift-protobuf"
126130
)
127131
static let dequeModule: Self = .product(name: "DequeModule", package: "swift-collections")
132+
static let atomics: Self = .product(name: "Atomics", package: "swift-atomics")
128133

129134
static let grpcCore: Self = .target(name: "GRPCCore")
130135
}
@@ -224,6 +229,7 @@ extension Target {
224229
dependencies: [
225230
.grpcCore,
226231
.dequeModule,
232+
.atomics
227233
]
228234
)
229235

Sources/GRPCCore/Call/Client/ClientRequest.swift

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,3 @@ extension ClientRequest {
102102
}
103103
}
104104
}
105-
106-
// MARK: - Conversion
107-
108-
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
109-
extension ClientRequest.Stream {
110-
@_spi(Testing)
111-
public init(single request: ClientRequest.Single<Message>) {
112-
self.init(metadata: request.metadata) {
113-
try await $0.write(request.message)
114-
}
115-
}
116-
}

Sources/GRPCCore/Call/Client/ClientResponse.swift

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -366,70 +366,3 @@ extension ClientResponse.Stream {
366366
}
367367
}
368368
}
369-
370-
// MARK: - Conversion
371-
372-
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
373-
extension ClientResponse.Single {
374-
init(stream response: ClientResponse.Stream<Message>) async {
375-
switch response.accepted {
376-
case .success(let contents):
377-
do {
378-
let metadata = contents.metadata
379-
var iterator = contents.bodyParts.makeAsyncIterator()
380-
381-
// Happy path: message, trailing metadata, nil.
382-
let part1 = try await iterator.next()
383-
let part2 = try await iterator.next()
384-
let part3 = try await iterator.next()
385-
386-
switch (part1, part2, part3) {
387-
case (.some(.message(let message)), .some(.trailingMetadata(let trailingMetadata)), .none):
388-
let contents = Contents(
389-
metadata: metadata,
390-
message: message,
391-
trailingMetadata: trailingMetadata
392-
)
393-
self.accepted = .success(contents)
394-
395-
case (.some(.message), .some(.message), _):
396-
let error = RPCError(
397-
code: .unimplemented,
398-
message: """
399-
Multiple messages received, but only one is expected. The server may have \
400-
incorrectly implemented the RPC or the client and server may have a different \
401-
opinion on whether this RPC streams responses.
402-
"""
403-
)
404-
self.accepted = .failure(error)
405-
406-
case (.some(.trailingMetadata), .none, .none):
407-
let error = RPCError(
408-
code: .unimplemented,
409-
message: "No messages received, exactly one was expected."
410-
)
411-
self.accepted = .failure(error)
412-
413-
case (_, _, _):
414-
let error = RPCError(
415-
code: .internalError,
416-
message: """
417-
The stream from the client transport is invalid. This is likely to be an incorrectly \
418-
implemented transport. Received parts: \([part1, part2, part3])."
419-
"""
420-
)
421-
self.accepted = .failure(error)
422-
}
423-
} catch let error as RPCError {
424-
// Known error type.
425-
self.accepted = .failure(error)
426-
} catch {
427-
// Unexpected, but should be handled nonetheless.
428-
self.accepted = .failure(RPCError(code: .unknown, message: String(describing: error)))
429-
}
430-
431-
case .failure(let error):
432-
self.accepted = .failure(error)
433-
}
434-
}
435-
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
18+
extension ClientRPCExecutor {
19+
/// An executor for requests which doesn't apply retries or hedging. The request has just one
20+
/// attempt at execution.
21+
@usableFromInline
22+
struct OneShotExecutor<
23+
Transport: ClientTransport,
24+
Serializer: MessageSerializer,
25+
Deserializer: MessageDeserializer
26+
> {
27+
@usableFromInline
28+
typealias Input = Serializer.Message
29+
@usableFromInline
30+
typealias Output = Deserializer.Message
31+
32+
@usableFromInline
33+
let transport: Transport
34+
@usableFromInline
35+
let timeout: Duration?
36+
@usableFromInline
37+
let interceptors: [any ClientInterceptor]
38+
@usableFromInline
39+
let serializer: Serializer
40+
@usableFromInline
41+
let deserializer: Deserializer
42+
43+
@inlinable
44+
init(
45+
transport: Transport,
46+
timeout: Duration?,
47+
interceptors: [any ClientInterceptor],
48+
serializer: Serializer,
49+
deserializer: Deserializer
50+
) {
51+
self.transport = transport
52+
self.timeout = timeout
53+
self.interceptors = interceptors
54+
self.serializer = serializer
55+
self.deserializer = deserializer
56+
}
57+
}
58+
}
59+
60+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
61+
extension ClientRPCExecutor.OneShotExecutor {
62+
@inlinable
63+
func execute<R>(
64+
request: ClientRequest.Stream<Input>,
65+
method: MethodDescriptor,
66+
responseHandler: @Sendable @escaping (ClientResponse.Stream<Output>) async throws -> R
67+
) async throws -> R {
68+
let result = await withTaskGroup(
69+
of: _OneShotExecutorTask<R>.self,
70+
returning: Result<R, Error>.self
71+
) { group in
72+
if let timeout = self.timeout {
73+
group.addTask {
74+
let result = await Result {
75+
try await Task.sleep(until: .now.advanced(by: timeout), clock: .continuous)
76+
}
77+
return .timedOut(result)
78+
}
79+
}
80+
81+
let streamExecutor = ClientStreamExecutor(transport: self.transport)
82+
group.addTask {
83+
return .streamExecutorCompleted(await streamExecutor.run())
84+
}
85+
86+
group.addTask {
87+
let response = await ClientRPCExecutor.unsafeExecute(
88+
request: request,
89+
method: method,
90+
attempt: 1,
91+
serializer: self.serializer,
92+
deserializer: self.deserializer,
93+
interceptors: self.interceptors,
94+
streamProcessor: streamExecutor
95+
)
96+
97+
let result = await Result {
98+
try await responseHandler(response)
99+
}
100+
101+
return .responseHandled(result)
102+
}
103+
104+
while let result = await group.next() {
105+
switch result {
106+
case .streamExecutorCompleted(.success):
107+
// Stream finished; wait for the response to be handled.
108+
()
109+
110+
case .streamExecutorCompleted(.failure):
111+
// Stream execution threw: cancel and wait.
112+
group.cancelAll()
113+
114+
case .timedOut(.success):
115+
// The deadline passed; cancel the ongoing work group.
116+
group.cancelAll()
117+
118+
case .timedOut(.failure):
119+
// The deadline task failed (because the task was cancelled). Wait for the response
120+
// to be handled.
121+
()
122+
123+
case .responseHandled(let result):
124+
// Response handled: cancel any other remaining tasks.
125+
group.cancelAll()
126+
return result
127+
}
128+
}
129+
130+
// Unreachable: exactly one task returns `responseHandled` and we return when it completes.
131+
fatalError("Internal inconsistency")
132+
}
133+
134+
return try result.get()
135+
}
136+
}
137+
138+
@usableFromInline
139+
enum _OneShotExecutorTask<R> {
140+
case streamExecutorCompleted(Result<Void, RPCError>)
141+
case timedOut(Result<Void, Error>)
142+
case responseHandled(Result<R, Error>)
143+
}

0 commit comments

Comments
 (0)