Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ extension Target {
],
path: "Sources/GRPCCore",
swiftSettings: [
.swiftLanguageMode(.v5),
.swiftLanguageMode(.v6),
.enableUpcomingFeature("ExistentialAny"),
.enableUpcomingFeature("InternalImportsByDefault")
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ extension ClientRPCExecutor {
@usableFromInline
struct HedgingExecutor<
Transport: ClientTransport,
Input: Sendable,
Output: Sendable,
Serializer: MessageSerializer,
Deserializer: MessageDeserializer
> {
@usableFromInline
typealias Input = Serializer.Message
@usableFromInline
typealias Output = Deserializer.Message

>: Sendable where Serializer.Message == Input, Deserializer.Message == Output {
@usableFromInline
let transport: Transport
@usableFromInline
Expand Down Expand Up @@ -181,14 +178,14 @@ extension ClientRPCExecutor.HedgingExecutor {
let state = SharedState(policy: self.policy)

// There's always a first attempt, safe to '!'.
let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() })!
let result = state.withState { $0.nextAttemptNumber()! }

group.addTask {
let result = await self._startAttempt(
request: request,
method: method,
options: options,
attempt: attempt,
attempt: result.nextAttempt,
state: state,
picker: picker,
responseHandler: responseHandler
Expand All @@ -199,7 +196,7 @@ extension ClientRPCExecutor.HedgingExecutor {

// Schedule the second attempt.
var nextScheduledAttempt = ScheduledState()
if scheduleNext {
if result.scheduleNext {
nextScheduledAttempt.schedule(in: &group, pushback: false, delay: self.policy.hedgingDelay)
}

Expand All @@ -212,13 +209,13 @@ extension ClientRPCExecutor.HedgingExecutor {
switch outcome {
case .ran:
// Start a new attempt and possibly schedule the next.
if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) {
if let result = state.withState({ $0.nextAttemptNumber() }) {
group.addTask {
let result = await self._startAttempt(
request: request,
method: method,
options: options,
attempt: attempt,
attempt: result.nextAttempt,
state: state,
picker: picker,
responseHandler: responseHandler
Expand All @@ -227,7 +224,7 @@ extension ClientRPCExecutor.HedgingExecutor {
}

// Schedule the next attempt.
if scheduleNext {
if result.scheduleNext {
nextScheduledAttempt.schedule(
in: &group,
pushback: false,
Expand Down Expand Up @@ -265,13 +262,13 @@ extension ClientRPCExecutor.HedgingExecutor {

nextScheduledAttempt.cancel()

if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) {
if let result = state.withState({ $0.nextAttemptNumber() }) {
group.addTask {
let result = await self._startAttempt(
request: request,
method: method,
options: options,
attempt: attempt,
attempt: result.nextAttempt,
state: state,
picker: picker,
responseHandler: responseHandler
Expand All @@ -280,7 +277,7 @@ extension ClientRPCExecutor.HedgingExecutor {
}

// Schedule the next retry.
if scheduleNext {
if result.scheduleNext {
nextScheduledAttempt.schedule(
in: &group,
pushback: true,
Expand Down Expand Up @@ -314,7 +311,7 @@ extension ClientRPCExecutor.HedgingExecutor {
}

@inlinable
func _startAttempt<R>(
func _startAttempt<R: Sendable>(
request: ClientRequest.Stream<Input>,
method: MethodDescriptor,
options: CallOptions,
Expand Down Expand Up @@ -431,7 +428,7 @@ extension ClientRPCExecutor.HedgingExecutor {
}

@usableFromInline
final class SharedState {
final class SharedState: Sendable {
@usableFromInline
let state: Mutex<State>

Expand All @@ -441,15 +438,15 @@ extension ClientRPCExecutor.HedgingExecutor {
}

@inlinable
func withState<ReturnType>(_ body: @Sendable (inout State) -> ReturnType) -> ReturnType {
func withState<ReturnType: Sendable>(_ body: (inout State) -> ReturnType) -> ReturnType {
self.state.withLock {
body(&$0)
}
}
}

@usableFromInline
struct State {
struct State: Sendable {
@usableFromInline
let _maximumAttempts: Int
@usableFromInline
Expand All @@ -474,14 +471,31 @@ extension ClientRPCExecutor.HedgingExecutor {
}
}

@usableFromInline
struct NextAttemptResult: Sendable {
@usableFromInline
var nextAttempt: Int
@usableFromInline
var scheduleNext: Bool

@inlinable
init(nextAttempt: Int, scheduleNext: Bool) {
self.nextAttempt = nextAttempt
self.scheduleNext = scheduleNext
}
}

@inlinable
mutating func nextAttemptNumber() -> (Int, Bool)? {
mutating func nextAttemptNumber() -> NextAttemptResult? {
if self.hasUsableResponse || self.attempt > self._maximumAttempts {
return nil
} else {
let attempt = self.attempt
self.attempt += 1
return (attempt, self.attempt <= self._maximumAttempts)
return NextAttemptResult(
nextAttempt: attempt,
scheduleNext: self.attempt <= self._maximumAttempts
)
}
}
}
Expand Down Expand Up @@ -533,28 +547,28 @@ extension ClientRPCExecutor.HedgingExecutor {

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
@usableFromInline
enum _HedgingTaskResult<R> {
enum _HedgingTaskResult<R: Sendable>: Sendable {
case rpcHandled(Result<R, any Error>)
case finishedRequest(Result<Void, any Error>)
case timedOut(Result<Void, any Error>)
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@usableFromInline
enum _HedgingAttemptTaskResult<R, Output> {
enum _HedgingAttemptTaskResult<R: Sendable, Output: Sendable>: Sendable {
case attemptPicked(Bool)
case attemptCompleted(AttemptResult)
case scheduledAttemptFired(ScheduleEvent)

@usableFromInline
enum AttemptResult {
enum AttemptResult: Sendable {
case unusableResponse(ClientResponse.Stream<Output>, Metadata.RetryPushback?)
case usableResponse(Result<R, any Error>)
case noStreamAvailable(any Error)
}

@usableFromInline
enum ScheduleEvent {
enum ScheduleEvent: Sendable {
case ran
case cancelled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ extension ClientRPCExecutor {
@usableFromInline
struct OneShotExecutor<
Transport: ClientTransport,
Input: Sendable,
Output: Sendable,
Serializer: MessageSerializer,
Deserializer: MessageDeserializer
> {
@usableFromInline
typealias Input = Serializer.Message
@usableFromInline
typealias Output = Deserializer.Message

>: Sendable where Serializer.Message == Input, Deserializer.Message == Output {
@usableFromInline
let transport: Transport
@usableFromInline
Expand Down Expand Up @@ -60,7 +57,7 @@ extension ClientRPCExecutor {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension ClientRPCExecutor.OneShotExecutor {
@inlinable
func execute<R>(
func execute<R: Sendable>(
request: ClientRequest.Stream<Input>,
method: MethodDescriptor,
options: CallOptions,
Expand All @@ -71,9 +68,10 @@ extension ClientRPCExecutor.OneShotExecutor {
if let deadline = self.deadline {
var request = request
request.metadata.timeout = ContinuousClock.now.duration(to: deadline)
let immutableRequest = request
result = await withDeadline(deadline) {
await self._execute(
request: request,
request: immutableRequest,
method: method,
options: options,
responseHandler: responseHandler
Expand All @@ -95,7 +93,7 @@ extension ClientRPCExecutor.OneShotExecutor {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension ClientRPCExecutor.OneShotExecutor {
@inlinable
func _execute<R>(
func _execute<R: Sendable>(
request: ClientRequest.Stream<Input>,
method: MethodDescriptor,
options: CallOptions,
Expand Down Expand Up @@ -133,9 +131,9 @@ extension ClientRPCExecutor.OneShotExecutor {

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@inlinable
func withDeadline<Result>(
func withDeadline<Result: Sendable>(
_ deadline: ContinuousClock.Instant,
execute: @escaping () async -> Result
execute: @Sendable @escaping () async -> Result
) async -> Result {
return await withTaskGroup(of: _DeadlineChildTaskResult<Result>.self) { group in
group.addTask {
Expand Down Expand Up @@ -173,7 +171,7 @@ func withDeadline<Result>(
}

@usableFromInline
enum _DeadlineChildTaskResult<Value> {
enum _DeadlineChildTaskResult<Value: Sendable>: Sendable {
case deadlinePassed
case timeoutCancelled
case taskCompleted(Value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ extension ClientRPCExecutor {
@usableFromInline
struct RetryExecutor<
Transport: ClientTransport,
Input: Sendable,
Output: Sendable,
Serializer: MessageSerializer,
Deserializer: MessageDeserializer
> {
@usableFromInline
typealias Input = Serializer.Message
@usableFromInline
typealias Output = Deserializer.Message

>: Sendable where Serializer.Message == Input, Deserializer.Message == Output {
@usableFromInline
let transport: Transport
@usableFromInline
Expand Down Expand Up @@ -198,7 +195,7 @@ extension ClientRPCExecutor.RetryExecutor {
}

@inlinable
func executeAttempt<R>(
func executeAttempt<R: Sendable>(
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
metadata: Metadata,
retryStream: BroadcastAsyncSequence<Input>,
Expand Down Expand Up @@ -307,7 +304,7 @@ extension ClientRPCExecutor.RetryExecutor {

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
@usableFromInline
enum _RetryExecutorTask<R> {
enum _RetryExecutorTask<R: Sendable>: Sendable {
case timedOut(Result<Void, any Error>)
case handledResponse(Result<R, any Error>)
case retry(Duration?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ internal enum ClientStreamExecutor {
}

let bodyParts = RawBodyPartToMessageSequence(
base: AsyncIteratorSequence(iterator.wrappedValue),
base: UncheckedAsyncIteratorSequence(iterator.wrappedValue),
deserializer: deserializer
)

Expand Down Expand Up @@ -168,7 +168,7 @@ internal enum ClientStreamExecutor {
Message: Sendable,
Deserializer: MessageDeserializer<Message>,
Failure: Error
>: AsyncSequence {
>: AsyncSequence, Sendable where Base: Sendable {
@usableFromInline
typealias Element = AsyncIterator.Element

Expand Down
4 changes: 2 additions & 2 deletions Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ struct ServerRPCExecutor {
ServerRequest.Stream<Input>
) async throws -> ServerResponse.Stream<Output>
) async {
let messages = AsyncIteratorSequence(inbound.wrappedValue).map { part throws -> Input in
let messages = UncheckedAsyncIteratorSequence(inbound.wrappedValue).map { part in
switch part {
case .message(let bytes):
return try deserializer.deserialize(bytes)
Expand Down Expand Up @@ -284,7 +284,7 @@ struct ServerRPCExecutor {
}

@usableFromInline
enum ServerExecutorTask {
enum ServerExecutorTask: Sendable {
case timedOut(Result<Void, any Error>)
case executed
}
Expand Down
8 changes: 4 additions & 4 deletions Sources/GRPCCore/GRPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public final class GRPCClient: Sendable {
/// - handler: A unary response handler.
///
/// - Returns: The return value from the `handler`.
public func unary<Request, Response, ReturnValue>(
public func unary<Request, Response, ReturnValue: Sendable>(
request: ClientRequest.Single<Request>,
descriptor: MethodDescriptor,
serializer: some MessageSerializer<Request>,
Expand Down Expand Up @@ -287,7 +287,7 @@ public final class GRPCClient: Sendable {
/// - handler: A unary response handler.
///
/// - Returns: The return value from the `handler`.
public func clientStreaming<Request, Response, ReturnValue>(
public func clientStreaming<Request, Response, ReturnValue: Sendable>(
request: ClientRequest.Stream<Request>,
descriptor: MethodDescriptor,
serializer: some MessageSerializer<Request>,
Expand Down Expand Up @@ -318,7 +318,7 @@ public final class GRPCClient: Sendable {
/// - handler: A response stream handler.
///
/// - Returns: The return value from the `handler`.
public func serverStreaming<Request, Response, ReturnValue>(
public func serverStreaming<Request, Response, ReturnValue: Sendable>(
request: ClientRequest.Single<Request>,
descriptor: MethodDescriptor,
serializer: some MessageSerializer<Request>,
Expand Down Expand Up @@ -350,7 +350,7 @@ public final class GRPCClient: Sendable {
/// - handler: A response stream handler.
///
/// - Returns: The return value from the `handler`.
public func bidirectionalStreaming<Request, Response, ReturnValue>(
public func bidirectionalStreaming<Request, Response, ReturnValue: Sendable>(
request: ClientRequest.Stream<Request>,
descriptor: MethodDescriptor,
serializer: some MessageSerializer<Request>,
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ extension TaskGroup {
}

@usableFromInline
enum _ResultOrCancelled {
enum _ResultOrCancelled: Sendable {
case result(ChildTaskResult)
case cancelled
}
Expand Down
Loading
Loading