Skip to content

Commit ab82da8

Browse files
authored
Make the retry throttle optional (#1734)
Motivation: The retry throttle should be optional, it's currently required by the client transport API. The in-process transport also defaults the throttle without allowing callers to configure it. Modifications: - Make the retry throttle an optional requirement on client transport - Allow callers to configure the retry throttle for the in-process client transport - Add `async throws` to one of the in-process server transport methods, while not necessary, the protocol allows it and we may want to change the implementation in the future. Result: Retry throttle is optional and configurable for the in-process transport
1 parent a7b4240 commit ab82da8

File tree

10 files changed

+25
-18
lines changed

10 files changed

+25
-18
lines changed

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ extension ClientRPCExecutor.HedgingExecutor {
359359
case .response(let response):
360360
switch response.accepted {
361361
case .success:
362-
self.transport.retryThrottle.recordSuccess()
362+
self.transport.retryThrottle?.recordSuccess()
363363

364364
if state.withLockedValue({ $0.receivedUsableResponse() }) {
365365
try? await picker.continuation.write(attempt)
@@ -376,11 +376,11 @@ extension ClientRPCExecutor.HedgingExecutor {
376376

377377
if self.policy.nonFatalStatusCodes.contains(Status.Code(error.code)) {
378378
// The response failed and the status code is non-fatal, we can make another attempt.
379-
self.transport.retryThrottle.recordFailure()
379+
self.transport.retryThrottle?.recordFailure()
380380
return .unusableResponse(response, error.metadata.retryPushback)
381381
} else {
382382
// A fatal error code counts as a success to the throttle.
383-
self.transport.retryThrottle.recordSuccess()
383+
self.transport.retryThrottle?.recordSuccess()
384384

385385
if state.withLockedValue({ $0.receivedUsableResponse() }) {
386386
try! await picker.continuation.write(attempt)

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ extension ClientRPCExecutor.RetryExecutor {
152152
case .success:
153153
// Request was accepted. This counts as success to the throttle and there's no need
154154
// to retry.
155-
self.transport.retryThrottle.recordSuccess()
155+
self.transport.retryThrottle?.recordSuccess()
156156
retryDelayOverride = nil
157157
shouldRetry = false
158158

@@ -170,7 +170,7 @@ extension ClientRPCExecutor.RetryExecutor {
170170

171171
if isRetryableStatusCode {
172172
// Counted as failure for throttling.
173-
let throttled = self.transport.retryThrottle.recordFailure()
173+
let throttled = self.transport.retryThrottle?.recordFailure() ?? false
174174

175175
// Status code can be retried, Did the server send pushback?
176176
switch error.metadata.retryPushback {
@@ -190,7 +190,7 @@ extension ClientRPCExecutor.RetryExecutor {
190190
}
191191
} else {
192192
// Not-retryable; this is considered a success.
193-
self.transport.retryThrottle.recordSuccess()
193+
self.transport.retryThrottle?.recordSuccess()
194194
shouldRetry = false
195195
retryDelayOverride = nil
196196
}

Sources/GRPCCore/Transport/ClientTransport.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public protocol ClientTransport: Sendable {
2424
/// Client transports don't need to implement the throttle or interact with it beyond its
2525
/// creation. gRPC will record the results of requests to determine whether retries can be
2626
/// performed.
27-
var retryThrottle: RetryThrottle { get }
27+
var retryThrottle: RetryThrottle? { get }
2828

2929
/// Establish and maintain a connection to the remote destination.
3030
///

Sources/GRPCInProcessTransport/InProcessClientTransport.swift

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,16 +96,23 @@ public struct InProcessClientTransport: ClientTransport {
9696
public typealias Inbound = RPCAsyncSequence<RPCResponsePart>
9797
public typealias Outbound = RPCWriter<RPCRequestPart>.Closable
9898

99-
public let retryThrottle: RetryThrottle
99+
public let retryThrottle: RetryThrottle?
100100

101101
private let methodConfiguration: MethodConfigurations
102102
private let state: _LockedValueBox<State>
103103

104+
/// Creates a new in-process client transport.
105+
///
106+
/// - Parameters:
107+
/// - server: The in-process server transport to connect to.
108+
/// - methodConfiguration: Method specific configuration.
109+
/// - retryThrottle: A throttle to apply to RPCs which are hedged or retried.
104110
public init(
105111
server: InProcessServerTransport,
106-
methodConfiguration: MethodConfigurations = MethodConfigurations()
112+
methodConfiguration: MethodConfigurations = MethodConfigurations(),
113+
retryThrottle: RetryThrottle? = nil
107114
) {
108-
self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
115+
self.retryThrottle = retryThrottle
109116
self.methodConfiguration = methodConfiguration
110117
self.state = _LockedValueBox(.unconnected(.init(serverTransport: server)))
111118
}

Sources/GRPCInProcessTransport/InProcessServerTransport.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public struct InProcessServerTransport: ServerTransport, Sendable {
5959
/// to this transport using the ``acceptStream(_:)`` method.
6060
///
6161
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
62-
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
62+
public func listen() async throws -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
6363
RPCAsyncSequence(wrapping: self.newStreams)
6464
}
6565

Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
2020
typealias Inbound = RPCAsyncSequence<RPCResponsePart>
2121
typealias Outbound = RPCWriter<RPCRequestPart>.Closable
2222

23-
private let _retryThrottle: @Sendable () -> RetryThrottle
23+
private let _retryThrottle: @Sendable () -> RetryThrottle?
2424
private let _withStream:
2525
@Sendable (
2626
_ method: MethodDescriptor,
@@ -52,7 +52,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
5252
}
5353
}
5454

55-
var retryThrottle: RetryThrottle {
55+
var retryThrottle: RetryThrottle? {
5656
self._retryThrottle()
5757
}
5858

Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable {
3939
self.transport = AnyClientTransport(wrapping: transport)
4040
}
4141

42-
var retryThrottle: RetryThrottle {
42+
var retryThrottle: RetryThrottle? {
4343
self.transport.retryThrottle
4444
}
4545

Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport {
2626
self.code = code
2727
}
2828

29-
let retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
29+
let retryThrottle: RetryThrottle? = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
3030

3131
func connect(lazily: Bool) async throws {
3232
// no-op

Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ final class InProcessClientTransportTests: XCTestCase {
162162
}
163163

164164
group.addTask {
165-
for try await stream in server.listen() {
165+
for try await stream in try await server.listen() {
166166
let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) }
167167
try await stream.outbound.write(RPCResponsePart.message([42]))
168168
stream.outbound.finish()

Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ final class InProcessServerTransportTests: XCTestCase {
3737
)
3838
)
3939

40-
let streamSequence = transport.listen()
40+
let streamSequence = try await transport.listen()
4141
var streamSequenceInterator = streamSequence.makeAsyncIterator()
4242

4343
try transport.acceptStream(stream)
@@ -66,7 +66,7 @@ final class InProcessServerTransportTests: XCTestCase {
6666
)
6767
)
6868

69-
let streamSequence = transport.listen()
69+
let streamSequence = try await transport.listen()
7070
var streamSequenceInterator = streamSequence.makeAsyncIterator()
7171

7272
try transport.acceptStream(firstStream)

0 commit comments

Comments
 (0)