Skip to content

Commit a98ec30

Browse files
committed
Add LockedState as fallback when Synchronization is not available
1 parent 8bec498 commit a98ec30

File tree

3 files changed

+257
-11
lines changed

3 files changed

+257
-11
lines changed

Sources/Subprocess/Atomic.swift

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if canImport(Synchronization)
13+
import Synchronization
14+
#else
15+
16+
#if canImport(os)
17+
internal import os
18+
#if canImport(C.os.lock)
19+
internal import C.os.lock
20+
#endif
21+
#elseif canImport(Bionic)
22+
@preconcurrency import Bionic
23+
#elseif canImport(Glibc)
24+
@preconcurrency import Glibc
25+
#elseif canImport(Musl)
26+
@preconcurrency import Musl
27+
#elseif canImport(WinSDK)
28+
import WinSDK
29+
#endif // canImport(os)
30+
31+
#endif // canImport(Synchronization)
32+
33+
internal protocol AtomicBoxProtocol: ~Copyable, Sendable {
34+
borrowing func bitwiseXor(
35+
_ operand: OutputConsumptionState
36+
) -> OutputConsumptionState
37+
38+
init(_ initialValue: OutputConsumptionState)
39+
}
40+
41+
internal struct AtomicBox<Wrapped: AtomicBoxProtocol & ~Copyable>: ~Copyable, Sendable {
42+
43+
private let storage: Wrapped
44+
45+
internal init(_ storage: consuming Wrapped) {
46+
self.storage = storage
47+
}
48+
49+
borrowing internal func bitwiseXor(
50+
_ operand: OutputConsumptionState
51+
) -> OutputConsumptionState {
52+
return self.storage.bitwiseXor(operand)
53+
}
54+
}
55+
56+
#if canImport(Synchronization)
57+
@available(macOS 15, *)
58+
extension Atomic: AtomicBoxProtocol where Value == UInt8 {
59+
borrowing func bitwiseXor(
60+
_ operand: OutputConsumptionState
61+
) -> OutputConsumptionState {
62+
let newState = self.bitwiseXor(
63+
operand.rawValue,
64+
ordering: .relaxed
65+
).newValue
66+
return OutputConsumptionState(rawValue: newState)
67+
}
68+
69+
init(_ initialValue: OutputConsumptionState) {
70+
self = Atomic(initialValue.rawValue)
71+
}
72+
}
73+
#else
74+
// Fallback to LockedState if `Synchronization` is not available
75+
extension LockedState: AtomicBoxProtocol where State == OutputConsumptionState {
76+
init(_ initialValue: OutputConsumptionState) {
77+
self.init(initialState: initialValue)
78+
}
79+
80+
func bitwiseXor(
81+
_ operand: OutputConsumptionState
82+
) -> OutputConsumptionState {
83+
return self.withLock { state in
84+
state = OutputConsumptionState(rawValue: state.rawValue ^ operand.rawValue)
85+
return state
86+
}
87+
}
88+
}
89+
90+
// MARK: - LockState
91+
internal struct LockedState<State>: ~Copyable {
92+
93+
// Internal implementation for a cheap lock to aid sharing code across platforms
94+
private struct _Lock {
95+
#if canImport(os)
96+
typealias Primitive = os_unfair_lock
97+
#elseif os(FreeBSD) || os(OpenBSD)
98+
typealias Primitive = pthread_mutex_t?
99+
#elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl)
100+
typealias Primitive = pthread_mutex_t
101+
#elseif canImport(WinSDK)
102+
typealias Primitive = SRWLOCK
103+
#elseif os(WASI)
104+
// WASI is single-threaded, so we don't need a lock.
105+
typealias Primitive = Void
106+
#endif
107+
108+
typealias PlatformLock = UnsafeMutablePointer<Primitive>
109+
var _platformLock: PlatformLock
110+
111+
fileprivate static func initialize(_ platformLock: PlatformLock) {
112+
#if canImport(os)
113+
platformLock.initialize(to: os_unfair_lock())
114+
#elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl)
115+
pthread_mutex_init(platformLock, nil)
116+
#elseif canImport(WinSDK)
117+
InitializeSRWLock(platformLock)
118+
#elseif os(WASI)
119+
// no-op
120+
#else
121+
#error("LockedState._Lock.initialize is unimplemented on this platform")
122+
#endif
123+
}
124+
125+
fileprivate static func deinitialize(_ platformLock: PlatformLock) {
126+
#if canImport(Bionic) || canImport(Glibc) || canImport(Musl)
127+
pthread_mutex_destroy(platformLock)
128+
#endif
129+
platformLock.deinitialize(count: 1)
130+
}
131+
132+
static fileprivate func lock(_ platformLock: PlatformLock) {
133+
#if canImport(os)
134+
os_unfair_lock_lock(platformLock)
135+
#elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl)
136+
pthread_mutex_lock(platformLock)
137+
#elseif canImport(WinSDK)
138+
AcquireSRWLockExclusive(platformLock)
139+
#elseif os(WASI)
140+
// no-op
141+
#else
142+
#error("LockedState._Lock.lock is unimplemented on this platform")
143+
#endif
144+
}
145+
146+
static fileprivate func unlock(_ platformLock: PlatformLock) {
147+
#if canImport(os)
148+
os_unfair_lock_unlock(platformLock)
149+
#elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl)
150+
pthread_mutex_unlock(platformLock)
151+
#elseif canImport(WinSDK)
152+
ReleaseSRWLockExclusive(platformLock)
153+
#elseif os(WASI)
154+
// no-op
155+
#else
156+
#error("LockedState._Lock.unlock is unimplemented on this platform")
157+
#endif
158+
}
159+
}
160+
161+
private class _Buffer: ManagedBuffer<State, _Lock.Primitive> {
162+
deinit {
163+
withUnsafeMutablePointerToElements {
164+
_Lock.deinitialize($0)
165+
}
166+
}
167+
}
168+
169+
private let _buffer: ManagedBuffer<State, _Lock.Primitive>
170+
171+
package init(initialState: State) {
172+
_buffer = _Buffer.create(
173+
minimumCapacity: 1,
174+
makingHeaderWith: { buf in
175+
buf.withUnsafeMutablePointerToElements {
176+
_Lock.initialize($0)
177+
}
178+
return initialState
179+
}
180+
)
181+
}
182+
183+
package func withLock<T>(_ body: @Sendable (inout State) throws -> T) rethrows -> T {
184+
try withLockUnchecked(body)
185+
}
186+
187+
package func withLockUnchecked<T>(_ body: (inout State) throws -> T) rethrows -> T {
188+
try _buffer.withUnsafeMutablePointers { state, lock in
189+
_Lock.lock(lock)
190+
defer { _Lock.unlock(lock) }
191+
return try body(&state.pointee)
192+
}
193+
}
194+
195+
// Ensures the managed state outlives the locked scope.
196+
package func withLockExtendingLifetimeOfState<T>(_ body: @Sendable (inout State) throws -> T) rethrows -> T {
197+
try _buffer.withUnsafeMutablePointers { state, lock in
198+
_Lock.lock(lock)
199+
return try withExtendedLifetime(state.pointee) {
200+
defer { _Lock.unlock(lock) }
201+
return try body(&state.pointee)
202+
}
203+
}
204+
}
205+
}
206+
207+
extension LockedState where State == Void {
208+
internal init() {
209+
self.init(initialState: ())
210+
}
211+
212+
internal func withLock<R: Sendable>(_ body: @Sendable () throws -> R) rethrows -> R {
213+
return try withLock { _ in
214+
try body()
215+
}
216+
}
217+
218+
internal func lock() {
219+
_buffer.withUnsafeMutablePointerToElements { lock in
220+
_Lock.lock(lock)
221+
}
222+
}
223+
224+
internal func unlock() {
225+
_buffer.withUnsafeMutablePointerToElements { lock in
226+
_Lock.unlock(lock)
227+
}
228+
}
229+
}
230+
231+
extension LockedState: @unchecked Sendable where State: Sendable {}
232+
233+
#endif

Sources/Subprocess/Configuration.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ extension Configuration: CustomStringConvertible, CustomDebugStringConvertible {
335335
)
336336
"""
337337
}
338+
338339
}
339340

340341
// MARK: - Cleanup

Sources/Subprocess/Execution.swift

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import Musl
2727
import WinSDK
2828
#endif
2929

30+
#if canImport(Synchronization)
3031
import Synchronization
32+
#endif
3133

3234
/// An object that repersents a subprocess that has been
3335
/// executed. You can use this object to send signals to the
@@ -46,7 +48,11 @@ public final class Execution<
4648
internal let error: Error
4749
internal let outputPipe: CreatedPipe
4850
internal let errorPipe: CreatedPipe
49-
internal let outputConsumptionState: Atomic<OutputConsumptionState.RawValue>
51+
#if canImport(Synchronization)
52+
internal let outputConsumptionState: AtomicBox<Atomic<OutputConsumptionState.RawValue>>
53+
#else
54+
internal let outputConsumptionState: AtomicBox<LockedState<OutputConsumptionState>>
55+
#endif
5056
#if os(Windows)
5157
internal let consoleBehavior: PlatformOptions.ConsoleBehavior
5258

@@ -63,7 +69,11 @@ public final class Execution<
6369
self.error = error
6470
self.outputPipe = outputPipe
6571
self.errorPipe = errorPipe
66-
self.outputConsumptionState = Atomic(0)
72+
#if canImport(Synchronization)
73+
self.outputConsumptionState = AtomicBox(Atomic(0))
74+
#else
75+
self.outputConsumptionState = AtomicBox(LockedState(OutputConsumptionState(rawValue: 0)))
76+
#endif
6777
self.consoleBehavior = consoleBehavior
6878
}
6979
#else
@@ -79,7 +89,11 @@ public final class Execution<
7989
self.error = error
8090
self.outputPipe = outputPipe
8191
self.errorPipe = errorPipe
82-
self.outputConsumptionState = Atomic(0)
92+
#if canImport(Synchronization)
93+
self.outputConsumptionState = AtomicBox(Atomic(0))
94+
#else
95+
self.outputConsumptionState = AtomicBox(LockedState(OutputConsumptionState(rawValue: 0)))
96+
#endif
8397
}
8498
#endif // os(Windows)
8599
}
@@ -95,11 +109,10 @@ extension Execution where Output == SequenceOutput {
95109
/// via pipe under the hood and each pipe can only be consumed once.
96110
public var standardOutput: some AsyncSequence<SequenceOutput.Buffer, any Swift.Error> {
97111
let consumptionState = self.outputConsumptionState.bitwiseXor(
98-
OutputConsumptionState.standardOutputConsumed.rawValue,
99-
ordering: .relaxed
100-
).newValue
112+
OutputConsumptionState.standardOutputConsumed,
113+
)
101114

102-
guard OutputConsumptionState(rawValue: consumptionState).contains(.standardOutputConsumed),
115+
guard consumptionState.contains(.standardOutputConsumed),
103116
let fd = self.outputPipe.readFileDescriptor
104117
else {
105118
fatalError("The standard output has already been consumed")
@@ -119,11 +132,10 @@ extension Execution where Error == SequenceOutput {
119132
/// via pipe under the hood and each pipe can only be consumed once.
120133
public var standardError: some AsyncSequence<SequenceOutput.Buffer, any Swift.Error> {
121134
let consumptionState = self.outputConsumptionState.bitwiseXor(
122-
OutputConsumptionState.standardErrorConsumed.rawValue,
123-
ordering: .relaxed
124-
).newValue
135+
OutputConsumptionState.standardOutputConsumed,
136+
)
125137

126-
guard OutputConsumptionState(rawValue: consumptionState).contains(.standardErrorConsumed),
138+
guard consumptionState.contains(.standardErrorConsumed),
127139
let fd = self.errorPipe.readFileDescriptor
128140
else {
129141
fatalError("The standard output has already been consumed")

0 commit comments

Comments
 (0)