Skip to content

Commit 4255e66

Browse files
committed
Use pthread_create() instead of DispatchQueue for runOnBackgroundThread() on Darwin
1 parent 63ddb46 commit 4255e66

File tree

2 files changed

+135
-94
lines changed

2 files changed

+135
-94
lines changed

Sources/Subprocess/Platforms/Subprocess+Unix.swift

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -415,11 +415,24 @@ internal typealias PlatformFileDescriptor = CInt
415415

416416
#if !canImport(Darwin)
417417
extension Configuration {
418+
419+
// @unchecked Sendable because we need to capture UnsafePointers
420+
// to send to another thread. While UnsafePointers are not
421+
// Sendable, we are not mutating them -- we only need these type
422+
// for C interface.
423+
internal struct SpawnContext: @unchecked Sendable {
424+
let argv: [UnsafeMutablePointer<CChar>?]
425+
let env: [UnsafeMutablePointer<CChar>?]
426+
let uidPtr: UnsafeMutablePointer<uid_t>?
427+
let gidPtr: UnsafeMutablePointer<gid_t>?
428+
let processGroupIDPtr: UnsafeMutablePointer<gid_t>?
429+
}
430+
418431
internal func spawn(
419432
withInput inputPipe: consuming CreatedPipe,
420433
outputPipe: consuming CreatedPipe,
421434
errorPipe: consuming CreatedPipe
422-
) throws -> SpawnResult {
435+
) async throws -> SpawnResult {
423436
// Ensure the waiter thread is running.
424437
#if os(Linux) || os(Android)
425438
_setupMonitorSignalHandler()
@@ -434,7 +447,7 @@ extension Configuration {
434447
var outputPipeBox: CreatedPipe? = consume outputPipe
435448
var errorPipeBox: CreatedPipe? = consume errorPipe
436449

437-
return try self.preSpawn { args throws -> SpawnResult in
450+
return try await self.preSpawn { args throws -> SpawnResult in
438451
let (env, uidPtr, gidPtr, supplementaryGroups) = args
439452

440453
var _inputPipe = inputPipeBox.take()!
@@ -472,27 +485,34 @@ extension Configuration {
472485
]
473486

474487
// Spawn
475-
var pid: pid_t = 0
476-
var processDescriptor: PlatformFileDescriptor = -1
477-
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
478-
return (self.workingDirectory?.string).withOptionalCString { workingDir in
479-
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
480-
return fileDescriptors.withUnsafeBufferPointer { fds in
481-
return _subprocess_fork_exec(
482-
&pid,
483-
&processDescriptor,
484-
exePath,
485-
workingDir,
486-
fds.baseAddress!,
487-
argv,
488-
env,
489-
uidPtr,
490-
gidPtr,
491-
processGroupIDPtr,
492-
CInt(supplementaryGroups?.count ?? 0),
493-
sgroups?.baseAddress,
494-
self.platformOptions.createSession ? 1 : 0
495-
)
488+
let spawnContext = SpawnContext(
489+
argv: argv, env: env, uidPtr: uidPtr, gidPtr: gidPtr, processGroupIDPtr: processGroupIDPtr
490+
)
491+
let (pid, processDescriptor, spawnError) = try await runOnBackgroundThread {
492+
return possibleExecutablePath.withCString { exePath in
493+
return (self.workingDirectory?.string).withOptionalCString { workingDir in
494+
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
495+
return fileDescriptors.withUnsafeBufferPointer { fds in
496+
var pid: pid_t = 0
497+
var processDescriptor: PlatformFileDescriptor = -1
498+
499+
let rc = _subprocess_fork_exec(
500+
&pid,
501+
&processDescriptor,
502+
exePath,
503+
workingDir,
504+
fds.baseAddress!,
505+
spawnContext.argv,
506+
spawnContext.env,
507+
spawnContext.uidPtr,
508+
spawnContext.gidPtr,
509+
spawnContext.processGroupIDPtr,
510+
CInt(supplementaryGroups?.count ?? 0),
511+
sgroups?.baseAddress,
512+
self.platformOptions.createSession ? 1 : 0
513+
)
514+
return (pid, processDescriptor, rc)
515+
}
496516
}
497517
}
498518
}

Sources/Subprocess/Thread.swift

Lines changed: 92 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#if canImport(Darwin)
1313
import Darwin
14+
import os
1415
#elseif canImport(Glibc)
1516
import Glibc
1617
#elseif canImport(Bionic)
@@ -28,24 +29,6 @@ import _SubprocessCShims
2829
import Synchronization
2930
#endif
3031

31-
#if canImport(Darwin)
32-
internal func runOnBackgroundThread<Result>(
33-
_ body: @Sendable @escaping () throws -> Result
34-
) async throws -> Result {
35-
let result = try await withCheckedThrowingContinuation { continuation in
36-
// On Darwin, use DispatchQueue directly
37-
DispatchQueue.global().async {
38-
do {
39-
let result = try body()
40-
continuation.resume(returning: result)
41-
} catch {
42-
continuation.resume(throwing: error)
43-
}
44-
}
45-
}
46-
return result
47-
}
48-
#else
4932

5033
#if canImport(WinSDK)
5134
private typealias MutexType = CRITICAL_SECTION
@@ -57,6 +40,19 @@ private typealias ConditionType = pthread_cond_t
5740
private typealias ThreadType = pthread_t
5841
#endif
5942

43+
internal func runOnBackgroundThread<Result>(
44+
_ body: @Sendable @escaping () throws -> Result
45+
) async throws -> Result {
46+
// Only executed once
47+
_setupWorkerThread
48+
49+
let result = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Result, any Error>) in
50+
let workItem = BackgroundWorkItem(body, continuation: continuation)
51+
_workQueue.enqueue(workItem)
52+
}
53+
return result
54+
}
55+
6056
private struct BackgroundWorkItem {
6157
private let work: @Sendable () -> Void
6258

@@ -165,12 +161,12 @@ private final class WorkQueue: Sendable {
165161
}
166162

167163
private let _workQueue = WorkQueue()
168-
private let _workQueueShutdownFlag: Atomic<UInt8> = Atomic(0)
164+
private let _workQueueShutdownFlag = AtomicCounter()
169165

170166
// Okay to be unlocked global mutable because this value is only set once like dispatch_once
171167
private nonisolated(unsafe) var _workerThread: Result<ThreadType, any Error> = .failure(SubprocessError(code: .init(.spawnFailed), underlyingError: nil))
172168

173-
private let setupWorkerThread: () = {
169+
private let _setupWorkerThread: () = {
174170
do {
175171
#if canImport(WinSDK)
176172
let workerThread = try begin_thread_x {
@@ -208,7 +204,7 @@ private func _shutdownWorkerThread() {
208204
guard case .success(let thread) = _workerThread else {
209205
return
210206
}
211-
guard _workQueueShutdownFlag.add(1, ordering: .sequentiallyConsistent).newValue == 1 else {
207+
guard _workQueueShutdownFlag.addOne() == 1 else {
212208
// We already shutdown this thread
213209
return
214210
}
@@ -227,65 +223,41 @@ private func _shutdownWorkerThread() {
227223
_workQueue.waitCondition.deallocate()
228224
}
229225

230-
internal func runOnBackgroundThread<Result>(
231-
_ body: @Sendable @escaping () throws -> Result
232-
) async throws -> Result {
233-
// Only executed once
234-
setupWorkerThread
226+
// MARK: - AtomicCounter
235227

236-
let result = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Result, any Error>) in
237-
let workItem = BackgroundWorkItem(body, continuation: continuation)
238-
_workQueue.enqueue(workItem)
239-
}
240-
return result
241-
}
228+
#if canImport(Darwin)
229+
// Unfortunately on Darwin we can unconditionally use Atomic since it requires macOS 15
230+
internal struct AtomicCounter: ~Copyable {
231+
private let storage: OSAllocatedUnfairLock<UInt8>
242232

243-
#endif
233+
internal init() {
234+
self.storage = .init(initialState: 0)
235+
}
244236

245-
// MARK: - Thread Creation Primitives
246-
#if canImport(Glibc) || canImport(Bionic) || canImport(Musl)
247-
internal func pthread_create(
248-
_ body: @Sendable @escaping () -> ()
249-
) throws(SubprocessError.UnderlyingError) -> pthread_t {
250-
final class Context {
251-
let body: @Sendable () -> ()
252-
init(body: @Sendable @escaping () -> Void) {
253-
self.body = body
237+
internal func addOne() -> UInt8 {
238+
return self.storage.withLock {
239+
$0 += 1
240+
return $0
254241
}
255242
}
256-
#if canImport(Glibc) || canImport(Musl)
257-
func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? {
258-
(Unmanaged<AnyObject>.fromOpaque(context!).takeRetainedValue() as! Context).body()
259-
return nil
260-
}
261-
#elseif canImport(Bionic)
262-
func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
263-
(Unmanaged<AnyObject>.fromOpaque(context).takeRetainedValue() as! Context).body()
264-
return context
243+
}
244+
#else
245+
internal struct AtomicCounter: ~Copyable {
246+
247+
private let storage: Atomic<UInt8>
248+
249+
internal init() {
250+
self.storage = Atomic(0)
265251
}
266-
#endif
267-
#if canImport(Glibc) || canImport(Bionic)
268-
var thread = pthread_t()
269-
#else
270-
var thread: pthread_t?
271-
#endif
272-
let rc = pthread_create(
273-
&thread,
274-
nil,
275-
proc,
276-
Unmanaged.passRetained(Context(body: body)).toOpaque()
277-
)
278-
if rc != 0 {
279-
throw SubprocessError.UnderlyingError(rawValue: rc)
252+
253+
internal func addOne() -> UInt8 {
254+
return self.storage.add(1, ordering: .sequentiallyConsistent).newValue
280255
}
281-
#if canImport(Glibc) || canImport(Bionic)
282-
return thread
283-
#else
284-
return thread!
285-
#endif
286256
}
257+
#endif
287258

288-
#elseif canImport(WinSDK)
259+
// MARK: - Thread Creation Primitives
260+
#if canImport(WinSDK)
289261
/// Microsoft documentation for `CreateThread` states:
290262
/// > A thread in an executable that calls the C run-time library (CRT)
291263
/// > should use the _beginthreadex and _endthreadex functions for
@@ -321,5 +293,54 @@ internal func begin_thread_x(
321293

322294
return threadHandle
323295
}
296+
#else
297+
298+
internal func pthread_create(
299+
_ body: @Sendable @escaping () -> ()
300+
) throws(SubprocessError.UnderlyingError) -> pthread_t {
301+
final class Context {
302+
let body: @Sendable () -> ()
303+
init(body: @Sendable @escaping () -> Void) {
304+
self.body = body
305+
}
306+
}
307+
#if canImport(Darwin)
308+
func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
309+
(Unmanaged<AnyObject>.fromOpaque(context).takeRetainedValue() as! Context).body()
310+
return context
311+
}
312+
#elseif canImport(Glibc) || canImport(Musl)
313+
func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? {
314+
(Unmanaged<AnyObject>.fromOpaque(context!).takeRetainedValue() as! Context).body()
315+
return context
316+
}
317+
#elseif canImport(Bionic)
318+
func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
319+
(Unmanaged<AnyObject>.fromOpaque(context).takeRetainedValue() as! Context).body()
320+
return context
321+
}
322+
#endif
323+
324+
#if canImport(Glibc) || canImport(Bionic)
325+
var thread = pthread_t()
326+
#else
327+
var thread: pthread_t?
324328
#endif
329+
let rc = pthread_create(
330+
&thread,
331+
nil,
332+
proc,
333+
Unmanaged.passRetained(Context(body: body)).toOpaque()
334+
)
335+
if rc != 0 {
336+
throw SubprocessError.UnderlyingError(rawValue: rc)
337+
}
338+
#if canImport(Glibc) || canImport(Bionic)
339+
return thread
340+
#else
341+
return thread!
342+
#endif
343+
}
344+
345+
#endif // canImport(WinSDK)
325346

0 commit comments

Comments
 (0)