Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

public import Synchronization // would be internal but for usableFromInline

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension ClientRPCExecutor {
@usableFromInline
Expand Down Expand Up @@ -176,10 +178,10 @@ extension ClientRPCExecutor.HedgingExecutor {
// of this. To avoid this each attempt goes via a state check before yielding to the sequence
// ensuring that only one response is used. (If this wasn't the case the response handler
// could be invoked more than once.)
let state = LockedValueBox(State(policy: self.policy))
let state = SharedState(policy: self.policy)

// There's always a first attempt, safe to '!'.
let (attempt, scheduleNext) = state.withLockedValue({ $0.nextAttemptNumber() })!
let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() })!

group.addTask {
let result = await self._startAttempt(
Expand Down Expand Up @@ -210,7 +212,7 @@ extension ClientRPCExecutor.HedgingExecutor {
switch outcome {
case .ran:
// Start a new attempt and possibly schedule the next.
if let (attempt, scheduleNext) = state.withLockedValue({ $0.nextAttemptNumber() }) {
if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) {
group.addTask {
let result = await self._startAttempt(
request: request,
Expand Down Expand Up @@ -263,7 +265,7 @@ extension ClientRPCExecutor.HedgingExecutor {

nextScheduledAttempt.cancel()

if let (attempt, scheduleNext) = state.withLockedValue({ $0.nextAttemptNumber() }) {
if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) {
group.addTask {
let result = await self._startAttempt(
request: request,
Expand Down Expand Up @@ -317,7 +319,7 @@ extension ClientRPCExecutor.HedgingExecutor {
method: MethodDescriptor,
options: CallOptions,
attempt: Int,
state: LockedValueBox<State>,
state: SharedState,
picker: (stream: BroadcastAsyncSequence<Int>, continuation: BroadcastAsyncSequence<Int>.Source),
responseHandler: @Sendable @escaping (ClientResponse.Stream<Output>) async throws -> R
) async -> _HedgingAttemptTaskResult<R, Output>.AttemptResult {
Expand Down Expand Up @@ -364,7 +366,7 @@ extension ClientRPCExecutor.HedgingExecutor {
case .success:
self.transport.retryThrottle?.recordSuccess()

if state.withLockedValue({ $0.receivedUsableResponse() }) {
if state.withState({ $0.receivedUsableResponse() }) {
try? await picker.continuation.write(attempt)
picker.continuation.finish()
let result = await Result { try await responseHandler(response) }
Expand All @@ -385,7 +387,7 @@ extension ClientRPCExecutor.HedgingExecutor {
// A fatal error code counts as a success to the throttle.
self.transport.retryThrottle?.recordSuccess()

if state.withLockedValue({ $0.receivedUsableResponse() }) {
if state.withState({ $0.receivedUsableResponse() }) {
try! await picker.continuation.write(attempt)
picker.continuation.finish()
let result = await Result { try await responseHandler(response) }
Expand Down Expand Up @@ -428,6 +430,24 @@ extension ClientRPCExecutor.HedgingExecutor {
}
}

@usableFromInline
final class SharedState {
@usableFromInline
let state: Mutex<State>

@inlinable
init(policy: HedgingPolicy) {
self.state = Mutex(State(policy: policy))
}

@inlinable
func withState<ReturnType>(_ body: @Sendable (inout State) -> ReturnType) -> ReturnType {
self.state.withLock {
body(&$0)
}
}
}

@usableFromInline
struct State {
@usableFromInline
Expand Down
16 changes: 8 additions & 8 deletions Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -237,48 +237,48 @@ extension UnsafeMutablePointer {
}

@usableFromInline
package struct LockedValueBox<Value> {
struct LockedValueBox<Value> {
@usableFromInline
let storage: LockStorage<Value>

@inlinable
package init(_ value: Value) {
init(_ value: Value) {
self.storage = .create(value: value)
}

@inlinable
package func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
return try self.storage.withLockedValue(mutate)
}

/// An unsafe view over the locked value box.
///
/// Prefer ``withLockedValue(_:)`` where possible.
@usableFromInline
package var unsafe: Unsafe {
var unsafe: Unsafe {
Unsafe(storage: self.storage)
}

@usableFromInline
package struct Unsafe {
struct Unsafe {
@usableFromInline
let storage: LockStorage<Value>

/// Manually acquire the lock.
@inlinable
package func lock() {
func lock() {
self.storage.lock()
}

/// Manually release the lock.
@inlinable
package func unlock() {
func unlock() {
self.storage.unlock()
}

/// Mutate the value, assuming the lock has been acquired manually.
@inlinable
package func withValueAssumingLockIsAcquired<T>(
func withValueAssumingLockIsAcquired<T>(
_ mutate: (inout Value) throws -> T
) rethrows -> T {
return try self.storage.withUnsafeMutablePointerToHeader { value in
Expand Down
20 changes: 11 additions & 9 deletions Sources/GRPCCore/Transport/RetryThrottle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

private import Synchronization

/// A throttle used to rate-limit retries and hedging attempts.
///
/// gRPC prevents servers from being overloaded by retries and hedging by using a token-based
Expand All @@ -28,13 +30,14 @@
/// the server.
///
/// See also [gRFC A6: client retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
public struct RetryThrottle: Sendable {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public final class RetryThrottle: Sendable {
// Note: only three figures after the decimal point from the original token ratio are used so
// all computation is done a scaled number of tokens (tokens * 1000). This allows us to do all
// computation in integer space.

/// The number of tokens available, multiplied by 1000.
private let scaledTokensAvailable: LockedValueBox<Int>
private let scaledTokensAvailable: Mutex<Int>
/// The number of tokens, multiplied by 1000.
private let scaledTokenRatio: Int
/// The maximum number of tokens, multiplied by 1000.
Expand Down Expand Up @@ -66,14 +69,14 @@ public struct RetryThrottle: Sendable {
/// If this value is less than or equal to the retry threshold (defined as `maximumTokens / 2`)
/// then RPCs will not be retried and hedging will be disabled.
public var tokens: Double {
self.scaledTokensAvailable.withLockedValue {
self.scaledTokensAvailable.withLock {
Double($0) / 1000
}
}

/// Returns whether retries and hedging are permitted at this time.
public var isRetryPermitted: Bool {
self.scaledTokensAvailable.withLockedValue {
self.scaledTokensAvailable.withLock {
$0 > self.scaledRetryThreshold
}
}
Expand All @@ -100,21 +103,20 @@ public struct RetryThrottle: Sendable {
self.scaledMaximumTokens = scaledTokens
self.scaledRetryThreshold = scaledTokens / 2
self.scaledTokenRatio = scaledTokenRatio
self.scaledTokensAvailable = LockedValueBox(scaledTokens)
self.scaledTokensAvailable = Mutex(scaledTokens)
}

/// Create a new throttle.
///
/// - Parameter policy: The policy to use to configure the throttle.
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
public init(policy: ServiceConfig.RetryThrottling) {
public convenience init(policy: ServiceConfig.RetryThrottling) {
self.init(maximumTokens: policy.maxTokens, tokenRatio: policy.tokenRatio)
}

/// Records a success, adding a token to the throttle.
@usableFromInline
func recordSuccess() {
self.scaledTokensAvailable.withLockedValue { value in
self.scaledTokensAvailable.withLock { value in
value = min(self.scaledMaximumTokens, value &+ self.scaledTokenRatio)
}
}
Expand All @@ -124,7 +126,7 @@ public struct RetryThrottle: Sendable {
@usableFromInline
@discardableResult
func recordFailure() -> Bool {
self.scaledTokensAvailable.withLockedValue { value in
self.scaledTokensAvailable.withLock { value in
value = max(0, value &- 1000)
return value <= self.scaledRetryThreshold
}
Expand Down
30 changes: 15 additions & 15 deletions Sources/GRPCHTTP2Core/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/

package import GRPCCore
internal import NIOConcurrencyHelpers
package import NIOCore
package import NIOHTTP2
private import Synchronization

/// A `Connection` provides communication to a single remote peer.
///
Expand All @@ -43,8 +43,8 @@ package import NIOHTTP2
/// }
/// }
/// ```
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
package struct Connection: Sendable {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
package final class Connection: Sendable {
/// Events which can happen over the lifetime of the connection.
package enum Event: Sendable {
/// The connect attempt succeeded and the connection is ready to use.
Expand Down Expand Up @@ -96,7 +96,7 @@ package struct Connection: Sendable {
private let http2Connector: any HTTP2Connector

/// The state of the connection.
private let state: NIOLockedValueBox<State>
private let state: Mutex<State>

/// The default max request message size in bytes, 4 MiB.
private static var defaultMaxRequestMessageSizeBytes: Int {
Expand All @@ -120,7 +120,7 @@ package struct Connection: Sendable {
self.http2Connector = http2Connector
self.event = AsyncStream.makeStream(of: Event.self)
self.input = AsyncStream.makeStream(of: Input.self)
self.state = NIOLockedValueBox(.notConnected)
self.state = Mutex(.notConnected)
}

/// Connect and run the connection.
Expand All @@ -135,7 +135,7 @@ package struct Connection: Sendable {
switch connectResult {
case .success(let connected):
// Connected successfully, update state and report the event.
self.state.withLockedValue { state in
self.state.withLock { state in
state.connected(connected)
}

Expand All @@ -151,7 +151,7 @@ package struct Connection: Sendable {
for await input in self.input.stream {
switch input {
case .close:
let asyncChannel = self.state.withLockedValue { $0.beginClosing() }
let asyncChannel = self.state.withLock { $0.beginClosing() }
if let channel = asyncChannel?.channel {
let event = ClientConnectionHandler.OutboundEvent.closeGracefully
channel.triggerUserOutboundEvent(event, promise: nil)
Expand All @@ -162,7 +162,7 @@ package struct Connection: Sendable {

case .failure(let error):
// Connect failed, this connection is no longer useful.
self.state.withLockedValue { $0.closed() }
self.state.withLock { $0.closed() }
self.finishStreams(withEvent: .connectFailed(error))
}
}
Expand All @@ -180,7 +180,7 @@ package struct Connection: Sendable {
descriptor: MethodDescriptor,
options: CallOptions
) async throws -> Stream {
let (multiplexer, scheme) = try self.state.withLockedValue { state in
let (multiplexer, scheme) = try self.state.withLock { state in
switch state {
case .connected(let connected):
return (connected.multiplexer, connected.scheme)
Expand Down Expand Up @@ -259,7 +259,7 @@ package struct Connection: Sendable {
self.event.continuation.yield(.connectSucceeded)

case .closing(let reason):
self.state.withLockedValue { $0.closing() }
self.state.withLock { $0.closing() }

switch reason {
case .goAway(let errorCode, let reason):
Expand All @@ -282,7 +282,7 @@ package struct Connection: Sendable {

let finalEvent: Event
if isReady {
let connectionCloseReason: Self.CloseReason
let connectionCloseReason: CloseReason
switch channelCloseReason {
case .keepaliveExpired:
connectionCloseReason = .keepaliveTimeout
Expand Down Expand Up @@ -323,7 +323,7 @@ package struct Connection: Sendable {
}

// The connection events sequence has finished: the connection is now closed.
self.state.withLockedValue { $0.closed() }
self.state.withLock { $0.closed() }
self.finishStreams(withEvent: finalEvent)
} catch {
let finalEvent: Event
Expand All @@ -338,7 +338,7 @@ package struct Connection: Sendable {
finalEvent = .connectFailed(makeNeverReadyError(cause: error))
}

self.state.withLockedValue { $0.closed() }
self.state.withLock { $0.closed() }
self.finishStreams(withEvent: finalEvent)
}
}
Expand All @@ -350,7 +350,7 @@ package struct Connection: Sendable {
}
}

@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension Connection {
package struct Stream {
package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart>
Expand Down Expand Up @@ -412,7 +412,7 @@ extension Connection {
}
}

@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension Connection {
private enum State: Sendable {
/// The connection is idle or connecting.
Expand Down
Loading
Loading