Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ let packageDependencies: [Package.Dependency] = [
url: "https://github.com/apple/swift-collections.git",
from: "1.0.5"
),
.package(
url: "https://github.com/apple/swift-atomics.git",
from: "1.2.0"
),
.package(
url: "https://github.com/apple/swift-protobuf.git",
from: "1.27.0"
Expand Down Expand Up @@ -123,6 +127,7 @@ extension Target.Dependency {
name: "SwiftProtobufPluginLibrary",
package: "swift-protobuf"
)
static let atomics: Self = .product(name: "Atomics", package: "swift-atomics")
static let dequeModule: Self = .product(name: "DequeModule", package: "swift-collections")
}

Expand All @@ -146,6 +151,7 @@ extension Target {
.logging,
.protobuf,
.dequeModule,
.atomics
].appending(
.nioSSL, if: includeNIOSSL
),
Expand Down Expand Up @@ -198,7 +204,8 @@ extension Target {
.nioEmbedded,
.nioTransportServices,
.logging,
.reflectionService
.reflectionService,
.atomics
].appending(
.nioSSL, if: includeNIOSSL
),
Expand Down
7 changes: 3 additions & 4 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ extension Target {
.logging,
.protobuf,
.dequeModule,
.atomics
].appending(
.nioSSL, if: includeNIOSSL
),
Expand All @@ -192,7 +193,6 @@ extension Target {
name: "GRPCCore",
dependencies: [
.dequeModule,
.atomics
],
path: "Sources/GRPCCore",
swiftSettings: [
Expand Down Expand Up @@ -242,7 +242,6 @@ extension Target {
.nioTLS,
.cgrpcZlib,
.dequeModule,
.atomics
],
swiftSettings: [
.swiftLanguageMode(.v6),
Expand Down Expand Up @@ -383,7 +382,8 @@ extension Target {
.nioEmbedded,
.nioTransportServices,
.logging,
.reflectionService
.reflectionService,
.atomics
].appending(
.nioSSL, if: includeNIOSSL
),
Expand All @@ -401,7 +401,6 @@ extension Target {
.grpcCore,
.grpcInProcessTransport,
.dequeModule,
.atomics,
.protobuf,
],
swiftSettings: [.swiftLanguageMode(.v6), .enableUpcomingFeature("ExistentialAny")]
Expand Down
148 changes: 62 additions & 86 deletions Sources/GRPCCore/GRPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

internal import Atomics
private import Synchronization

/// A gRPC client.
///
Expand Down Expand Up @@ -110,7 +110,7 @@ internal import Atomics
/// additional resources that need their lifecycles managed you should consider using [Swift Service
/// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public struct GRPCClient: Sendable {
public final class GRPCClient: Sendable {
/// The transport which provides a bidirectional communication channel with the server.
private let transport: any ClientTransport

Expand All @@ -123,10 +123,10 @@ public struct GRPCClient: Sendable {
private let interceptors: [any ClientInterceptor]

/// The current state of the client.
private let state: ManagedAtomic<State>
private let state: Mutex<State>

/// The state of the client.
private enum State: UInt8, AtomicValue {
private enum State: Sendable {
/// The client hasn't been started yet. Can transition to `running` or `stopped`.
case notStarted
/// The client is running and can send RPCs. Can transition to `stopping`.
Expand All @@ -137,6 +137,56 @@ public struct GRPCClient: Sendable {
/// The client has stopped, no RPCs are in flight and no more will be accepted. This state
/// is terminal.
case stopped

mutating func run() throws {
switch self {
case .notStarted:
self = .running

case .running:
throw RuntimeError(
code: .clientIsAlreadyRunning,
message: "The client is already running and can only be started once."
)

case .stopping, .stopped:
throw RuntimeError(
code: .clientIsStopped,
message: "The client has stopped and can only be started once."
)
}
}

mutating func stopped() {
self = .stopped
}

mutating func beginGracefulShutdown() -> Bool {
switch self {
case .notStarted:
self = .stopped
return false
case .running:
self = .stopping
return true
case .stopping, .stopped:
return false
}
}

func checkExecutable() throws {
switch self {
case .notStarted, .running:
// Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
// queuing the request if not yet started.
()
case .stopping, .stopped:
throw RuntimeError(
code: .clientIsStopped,
message: "Client has been stopped. Can't make any more RPCs."
)
}
}
}

/// Creates a new client with the given transport, interceptors and configuration.
Expand All @@ -154,7 +204,7 @@ public struct GRPCClient: Sendable {
) {
self.transport = transport
self.interceptors = interceptors
self.state = ManagedAtomic(.notStarted)
self.state = Mutex(.notStarted)
}

/// Start the client.
Expand All @@ -165,33 +215,11 @@ public struct GRPCClient: Sendable {
/// The client, and by extension this function, can only be run once. If the client is already
/// running or has already been closed then a ``RuntimeError`` is thrown.
public func run() async throws {
let (wasNotStarted, original) = self.state.compareExchange(
expected: .notStarted,
desired: .running,
ordering: .sequentiallyConsistent
)

guard wasNotStarted else {
switch original {
case .notStarted:
// The value wasn't exchanged so the original value can't be 'notStarted'.
fatalError()
case .running:
throw RuntimeError(
code: .clientIsAlreadyRunning,
message: "The client is already running and can only be started once."
)
case .stopping, .stopped:
throw RuntimeError(
code: .clientIsStopped,
message: "The client has stopped and can only be started once."
)
}
}
try self.state.withLock { try $0.run() }

// When we exit this function we must have stopped.
// When this function exits the client must have stopped.
defer {
self.state.store(.stopped, ordering: .sequentiallyConsistent)
self.state.withLock { $0.stopped() }
}

do {
Expand All @@ -211,50 +239,9 @@ public struct GRPCClient: Sendable {
/// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
/// executing ``run()`` if you want to abruptly stop in-flight RPCs.
public func beginGracefulShutdown() {
while true {
let (wasRunning, actualState) = self.state.compareExchange(
expected: .running,
desired: .stopping,
ordering: .sequentiallyConsistent
)

// Transition from running to stopping: close the transport.
if wasRunning {
self.transport.beginGracefulShutdown()
return
}

// The expected state wasn't 'running'. There are two options:
// 1. The client isn't running yet.
// 2. The client is already stopping or stopped.
switch actualState {
case .notStarted:
// Not started: try going straight to stopped.
let (wasNotStarted, _) = self.state.compareExchange(
expected: .notStarted,
desired: .stopped,
ordering: .sequentiallyConsistent
)

// If the exchange happened then just return: the client wasn't started so there's no
// transport to start.
//
// If the exchange didn't happen then continue looping: the client must've been started by
// another thread.
if wasNotStarted {
return
} else {
continue
}

case .running:
// Unreachable: the value was exchanged and this was the expected value.
fatalError()

case .stopping, .stopped:
// No exchange happened but the client is already stopping.
return
}
let wasRunning = self.state.withLock { $0.beginGracefulShutdown() }
if wasRunning {
self.transport.beginGracefulShutdown()
}
}

Expand Down Expand Up @@ -371,18 +358,7 @@ public struct GRPCClient: Sendable {
options: CallOptions,
handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
) async throws -> ReturnValue {
switch self.state.load(ordering: .sequentiallyConsistent) {
case .notStarted, .running:
// Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
// queuing the request if not yet started.
()
case .stopping, .stopped:
throw RuntimeError(
code: .clientIsStopped,
message: "Client has been stopped. Can't make any more RPCs."
)
}

try self.state.withLock { try $0.checkExecutable() }
let methodConfig = self.transport.configuration(forMethod: descriptor)
var options = options
options.formUnion(with: methodConfig)
Expand Down
Loading
Loading