Skip to content

Commit 2abc8bd

Browse files
authored
Add transport protocols and surrounding types (#1660)
Motivation: The transport protocols provide a lower-level abstraction for different conection protocols. For example, there will be an HTTP/2 transport and in the future, when an implementation arises, a separate HTTP/3 transport. This change adds these interfaces and a handful of related types which surround the transport protocols. Modifications: - Add RPC parts - Add RPC stream composed of streams inbound and outbound RPC parts - Add async sequence wrapper - Add writer protocol and wrapper - Add various currency types - Tests Results: Transport protocols and related types are in place
1 parent b76f4b4 commit 2abc8bd

12 files changed

+769
-0
lines changed
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/// Configuration values for executing an RPC.
18+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
19+
public struct ClientRPCExecutionConfiguration: Hashable, Sendable {
20+
/// The default timeout for the RPC.
21+
///
22+
/// If no reply is received in the specified amount of time the request is aborted
23+
/// with an ``RPCError`` with code ``RPCError/Code/deadlineExceeded``.
24+
///
25+
/// The actual deadline used will be the minimum of the value specified here
26+
/// and the value set by the application by the client API. If either one isn't set
27+
/// then the other value is used. If neither is set then the request has no deadline.
28+
///
29+
/// The timeout applies to the overall execution of an RPC. If, for example, a retry
30+
/// policy is set then the timeout begins when the first attempt is started and _isn't_ reset
31+
/// when subsequent attempts start.
32+
public var timeout: Duration?
33+
34+
/// The policy determining how many times, and when, the RPC is executed.
35+
///
36+
/// There are two policy types:
37+
/// 1. Retry
38+
/// 2. Hedging
39+
///
40+
/// The retry policy allows an RPC to be retried a limited number of times if the RPC
41+
/// fails with one of the configured set of status codes. RPCs are only retried if they
42+
/// fail immediately, that is, the first response part received from the server is a
43+
/// status code.
44+
///
45+
/// The hedging policy allows an RPC to be executed multiple times concurrently. Typically
46+
/// each execution will be staggered by some delay. The first successful response will be
47+
/// reported to the client. Hedging is only suitable for idempotent RPCs.
48+
public var executionPolicy: ExecutionPolicy?
49+
50+
/// Create an execution configuration.
51+
///
52+
/// - Parameters:
53+
/// - executionPolicy: The execution policy to use for the RPC.
54+
/// - timeout: The default timeout for the RPC.
55+
public init(
56+
executionPolicy: ExecutionPolicy?,
57+
timeout: Duration?
58+
) {
59+
self.executionPolicy = executionPolicy
60+
self.timeout = timeout
61+
}
62+
63+
/// Create an execution configuration with a retry policy.
64+
///
65+
/// - Parameters:
66+
/// - retryPolicy: The policy for retrying the RPC.
67+
/// - timeout: The default timeout for the RPC.
68+
public init(
69+
retryPolicy: RetryPolicy,
70+
timeout: Duration? = nil
71+
) {
72+
self.executionPolicy = .retry(retryPolicy)
73+
self.timeout = timeout
74+
}
75+
76+
/// Create an execution configuration with a hedging policy.
77+
///
78+
/// - Parameters:
79+
/// - hedgingPolicy: The policy for hedging the RPC.
80+
/// - timeout: The default timeout for the RPC.
81+
public init(
82+
hedgingPolicy: HedgingPolicy,
83+
timeout: Duration? = nil
84+
) {
85+
self.executionPolicy = .hedge(hedgingPolicy)
86+
self.timeout = timeout
87+
}
88+
}
89+
90+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
91+
extension ClientRPCExecutionConfiguration {
92+
/// The execution policy for an RPC.
93+
public enum ExecutionPolicy: Hashable, Sendable {
94+
/// Policy for retrying an RPC.
95+
///
96+
/// See ``RetryPolicy`` for more details.
97+
case retry(RetryPolicy)
98+
99+
/// Policy for hedging an RPC.
100+
///
101+
/// See ``HedgingPolicy`` for more details.
102+
case hedge(HedgingPolicy)
103+
}
104+
}
105+
106+
/// Policy for retrying an RPC.
107+
///
108+
/// gRPC retries RPCs when the first response from the server is a status code which matches
109+
/// one of the configured retryable status codes. If the server begins processing the RPC and
110+
/// first responds with metadata and later responds with a retryable status code then the RPC
111+
/// won't be retried.
112+
///
113+
/// Execution attempts are limited by ``maximumAttempts`` which includes the original attempt. The
114+
/// maximum number of attempts is limited to five.
115+
///
116+
/// Subsequent attempts are executed after some delay. The first _retry_, or second attempt, will
117+
/// be started after a randomly chosen delay between zero and ``initialBackoff``. More generally,
118+
/// the nth retry will happen after a randomly chosen delay between zero
119+
/// and `min(initialBackoff * backoffMultiplier^(n-1), maximumBackoff)`.
120+
///
121+
/// For more information see [gRFC A6 Client
122+
/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
123+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
124+
public struct RetryPolicy: Hashable, Sendable {
125+
/// The maximum number of RPC attempts, including the original attempt.
126+
///
127+
/// Must be greater than one, values greater than five are treated as five.
128+
public var maximumAttempts: Int {
129+
didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) }
130+
}
131+
132+
/// The initial backoff duration.
133+
///
134+
/// The initial retry will occur after a random amount of time up to this value.
135+
///
136+
/// - Precondition: Must be greater than zero.
137+
public var initialBackoff: Duration {
138+
willSet { Self.validateInitialBackoff(newValue) }
139+
}
140+
141+
/// The maximum amount of time to backoff for.
142+
///
143+
/// - Precondition: Must be greater than zero.
144+
public var maximumBackoff: Duration {
145+
willSet { Self.validateMaxBackoff(newValue) }
146+
}
147+
148+
/// The multiplier to apply to backoff.
149+
///
150+
/// - Precondition: Must be greater than zero.
151+
public var backoffMultiplier: Double {
152+
willSet { Self.validateBackoffMultiplier(newValue) }
153+
}
154+
155+
/// The set of status codes which may be retried.
156+
///
157+
/// - Precondition: Must not be empty.
158+
public var retryableStatusCodes: Set<Status.Code> {
159+
willSet { Self.validateRetryableStatusCodes(newValue) }
160+
}
161+
162+
/// Create a new retry policy.
163+
///
164+
/// - Parameters:
165+
/// - maximumAttempts: The maximum number of attempts allowed for the RPC.
166+
/// - initialBackoff: The initial backoff period for the first retry attempt. Must be
167+
/// greater than zero.
168+
/// - maximumBackoff: The maximum period of time to wait between attempts. Must be greater than
169+
/// zero.
170+
/// - backoffMultiplier: The exponential backoff multiplier. Must be greater than zero.
171+
/// - retryableStatusCodes: The set of status codes which may be retried. Must not be empty.
172+
/// - Precondition: `maximumAttempts`, `initialBackoff`, `maximumBackoff` and `backoffMultiplier`
173+
/// must be greater than zero.
174+
/// - Precondition: `retryableStatusCodes` must not be empty.
175+
public init(
176+
maximumAttempts: Int,
177+
initialBackoff: Duration,
178+
maximumBackoff: Duration,
179+
backoffMultiplier: Double,
180+
retryableStatusCodes: Set<Status.Code>
181+
) {
182+
self.maximumAttempts = validateMaxAttempts(maximumAttempts)
183+
184+
Self.validateInitialBackoff(initialBackoff)
185+
self.initialBackoff = initialBackoff
186+
187+
Self.validateMaxBackoff(maximumBackoff)
188+
self.maximumBackoff = maximumBackoff
189+
190+
Self.validateBackoffMultiplier(backoffMultiplier)
191+
self.backoffMultiplier = backoffMultiplier
192+
193+
Self.validateRetryableStatusCodes(retryableStatusCodes)
194+
self.retryableStatusCodes = retryableStatusCodes
195+
}
196+
197+
private static func validateInitialBackoff(_ value: Duration) {
198+
precondition(value.isGreaterThanZero, "initialBackoff must be greater than zero")
199+
}
200+
201+
private static func validateMaxBackoff(_ value: Duration) {
202+
precondition(value.isGreaterThanZero, "maximumBackoff must be greater than zero")
203+
}
204+
205+
private static func validateBackoffMultiplier(_ value: Double) {
206+
precondition(value > 0, "backoffMultiplier must be greater than zero")
207+
}
208+
209+
private static func validateRetryableStatusCodes(_ value: Set<Status.Code>) {
210+
precondition(!value.isEmpty, "retryableStatusCodes mustn't be empty")
211+
}
212+
}
213+
214+
/// Policy for hedging an RPC.
215+
///
216+
/// Hedged RPCs may execute more than once on a server so only idempotent methods should
217+
/// be hedged.
218+
///
219+
/// gRPC executes the RPC at most ``maximumAttempts`` times, staggering each attempt
220+
/// by ``hedgingDelay``.
221+
///
222+
/// For more information see [gRFC A6 Client
223+
/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
224+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
225+
public struct HedgingPolicy: Hashable, Sendable {
226+
/// The maximum number of RPC attempts, including the original attempt.
227+
///
228+
/// Values greater than five are treated as five.
229+
///
230+
/// - Precondition: Must be greater than one.
231+
public var maximumAttempts: Int {
232+
didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) }
233+
}
234+
235+
/// The first RPC will be sent immediately, but each subsequent RPC will be sent at intervals
236+
/// of `hedgingDelay`. Set this to zero to immediately send all RPCs.
237+
public var hedgingDelay: Duration {
238+
willSet { Self.validateHedgingDelay(newValue) }
239+
}
240+
241+
/// The set of status codes which indicate other hedged RPCs may still succeed.
242+
///
243+
/// If a non-fatal status code is returned by the server, hedged RPCs will continue.
244+
/// Otherwise, outstanding requests will be cancelled and the error returned to the
245+
/// application layer.
246+
public var nonFatalStatusCodes: Set<Status.Code>
247+
248+
/// Create a new hedging policy.
249+
///
250+
/// - Parameters:
251+
/// - maximumAttempts: The maximum number of attempts allowed for the RPC.
252+
/// - hedgingDelay: The delay between each hedged RPC.
253+
/// - nonFatalStatusCodes: The set of status codes which indicated other hedged RPCs may still
254+
/// succeed.
255+
/// - Precondition: `maximumAttempts` must be greater than zero.
256+
public init(
257+
maximumAttempts: Int,
258+
hedgingDelay: Duration,
259+
nonFatalStatusCodes: Set<Status.Code>
260+
) {
261+
self.maximumAttempts = validateMaxAttempts(maximumAttempts)
262+
263+
Self.validateHedgingDelay(hedgingDelay)
264+
self.hedgingDelay = hedgingDelay
265+
self.nonFatalStatusCodes = nonFatalStatusCodes
266+
}
267+
268+
private static func validateHedgingDelay(_ value: Duration) {
269+
precondition(
270+
value.isGreaterThanOrEqualToZero,
271+
"hedgingDelay must be greater than or equal to zero"
272+
)
273+
}
274+
}
275+
276+
private func validateMaxAttempts(_ value: Int) -> Int {
277+
precondition(value > 0, "maximumAttempts must be greater than zero")
278+
return min(value, 5)
279+
}
280+
281+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
282+
extension Duration {
283+
fileprivate var isGreaterThanZero: Bool {
284+
self.components.seconds > 0 || self.components.attoseconds > 0
285+
}
286+
287+
fileprivate var isGreaterThanOrEqualToZero: Bool {
288+
self.components.seconds >= 0 || self.components.attoseconds >= 0
289+
}
290+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/// A description of a method on a service.
18+
public struct MethodDescriptor: Sendable, Hashable {
19+
/// The name of the service, including the package name.
20+
///
21+
/// For example, the name of the "Greeter" service in "helloworld" package
22+
/// is "helloworld.Greeter".
23+
public var service: String
24+
25+
/// The name of the method in the service, excluding the service name.
26+
public var method: String
27+
28+
/// The fully qualified method name in the format "package.service/method".
29+
///
30+
/// For example, the fully qualified name of the "SayHello" method of the "Greeter" service in
31+
/// "helloworld" package is "helloworld.Greeter/SayHelllo".
32+
public var fullyQualifiedMethod: String {
33+
"\(self.service)/\(self.method)"
34+
}
35+
36+
/// Creates a new method descriptor.
37+
///
38+
/// - Parameters:
39+
/// - service: The name of the service, including the package name. For example,
40+
/// "helloworld.Greeter".
41+
/// - method: The name of the method. For example, "SayHello".
42+
public init(service: String, method: String) {
43+
self.service = service
44+
self.method = method
45+
}
46+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/// A type-erasing `AsyncSequence`.
18+
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
19+
public struct RPCAsyncSequence<Element>: AsyncSequence, Sendable {
20+
private let _makeAsyncIterator: @Sendable () -> AsyncIterator
21+
22+
/// Creates an ``RPCAsyncSequence`` by wrapping another `AsyncSequence`.
23+
public init<S: AsyncSequence>(wrapping other: S) where S.Element == Element {
24+
self._makeAsyncIterator = {
25+
AsyncIterator(wrapping: other.makeAsyncIterator())
26+
}
27+
}
28+
29+
public func makeAsyncIterator() -> AsyncIterator {
30+
self._makeAsyncIterator()
31+
}
32+
33+
public struct AsyncIterator: AsyncIteratorProtocol {
34+
private var iterator: any AsyncIteratorProtocol
35+
36+
fileprivate init<Iterator>(
37+
wrapping other: Iterator
38+
) where Iterator: AsyncIteratorProtocol, Iterator.Element == Element {
39+
self.iterator = other
40+
}
41+
42+
public mutating func next() async throws -> Element? {
43+
return try await self.iterator.next() as? Element
44+
}
45+
}
46+
}
47+
48+
@available(*, unavailable)
49+
extension RPCAsyncSequence.AsyncIterator: Sendable {}

0 commit comments

Comments
 (0)