Skip to content

Commit 6eb91f6

Browse files
authored
Call spawn() on a background thread because it might block (#158)
* Call spawn() on a background thread because it might block Introduce runOnBackgroundThread() to run closures on a background thread without blocking the Swift Concurrency thread pool * Use pthread_create() instead of DispatchQueue for runOnBackgroundThread() on Darwin * Enable runOnBackgroundThread for spawn() on Windows * Disable testWriteToClosedPipe() and testReadFromClosedPipe() because we can't safely write to / read from a closed fd
1 parent 9ea5cd9 commit 6eb91f6

File tree

10 files changed

+574
-207
lines changed

10 files changed

+574
-207
lines changed

Sources/Subprocess/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ add_library(Subprocess
1414
Buffer.swift
1515
Error.swift
1616
Teardown.swift
17+
Thread.swift
1718
Result.swift
1819
IO/Output.swift
1920
IO/Input.swift

Sources/Subprocess/Configuration.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public struct Configuration: Sendable {
9393
isolation: isolated (any Actor)? = #isolation,
9494
_ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result)
9595
) async throws -> ExecutionResult<Result> {
96-
let spawnResults = try self.spawn(
96+
let spawnResults = try await self.spawn(
9797
withInput: input,
9898
outputPipe: output,
9999
errorPipe: error
@@ -683,7 +683,11 @@ internal struct IODescriptor: ~Copyable {
683683
#endif
684684

685685
internal var closeWhenDone: Bool
686+
#if canImport(WinSDK)
687+
internal nonisolated(unsafe) let descriptor: Descriptor
688+
#else
686689
internal let descriptor: Descriptor
690+
#endif
687691

688692
internal init(
689693
_ descriptor: Descriptor,

Sources/Subprocess/IO/AsyncIO+Windows.swift

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ final class AsyncIO: @unchecked Sendable {
4343
) rethrows -> ResultType
4444
}
4545

46-
private final class MonitorThreadContext {
46+
private struct MonitorThreadContext: @unchecked Sendable {
4747
let ioCompletionPort: HANDLE
4848

4949
init(ioCompletionPort: HANDLE) {
@@ -61,9 +61,9 @@ final class AsyncIO: @unchecked Sendable {
6161
var maybeSetupError: SubprocessError? = nil
6262
// Create the the completion port
6363
guard
64-
let port = CreateIoCompletionPort(
64+
let ioCompletionPort = CreateIoCompletionPort(
6565
INVALID_HANDLE_VALUE, nil, 0, 0
66-
), port != INVALID_HANDLE_VALUE
66+
), ioCompletionPort != INVALID_HANDLE_VALUE
6767
else {
6868
let error = SubprocessError(
6969
code: .init(.asyncIOFailed("CreateIoCompletionPort failed")),
@@ -73,17 +73,12 @@ final class AsyncIO: @unchecked Sendable {
7373
self.monitorThread = .failure(error)
7474
return
7575
}
76-
self.ioCompletionPort = .success(port)
76+
self.ioCompletionPort = .success(ioCompletionPort)
7777
// Create monitor thread
78-
let threadContext = MonitorThreadContext(ioCompletionPort: port)
79-
let threadContextPtr = Unmanaged.passRetained(threadContext)
80-
/// Microsoft documentation for `CreateThread` states:
81-
/// > A thread in an executable that calls the C run-time library (CRT)
82-
/// > should use the _beginthreadex and _endthreadex functions for
83-
/// > thread management rather than CreateThread and ExitThread
84-
let threadHandleValue = _beginthreadex(
85-
nil, 0,
86-
{ args in
78+
let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort)
79+
let threadHandle: HANDLE
80+
do {
81+
threadHandle = try begin_thread_x {
8782
func reportError(_ error: SubprocessError) {
8883
let continuations = _registration.withLock { store in
8984
return store.values
@@ -93,9 +88,6 @@ final class AsyncIO: @unchecked Sendable {
9388
}
9489
}
9590

96-
let unmanaged = Unmanaged<MonitorThreadContext>.fromOpaque(args!)
97-
let context = unmanaged.takeRetainedValue()
98-
9991
// Monitor loop
10092
while true {
10193
var bytesTransferred: DWORD = 0
@@ -149,16 +141,13 @@ final class AsyncIO: @unchecked Sendable {
149141
}
150142
continuation?.yield(bytesTransferred)
151143
}
144+
152145
return 0
153-
}, threadContextPtr.toOpaque(), 0, nil)
154-
guard threadHandleValue > 0,
155-
let threadHandle = HANDLE(bitPattern: threadHandleValue)
156-
else {
157-
// _beginthreadex uses errno instead of GetLastError()
158-
let capturedError = _subprocess_windows_get_errno()
146+
}
147+
} catch let underlyingError {
159148
let error = SubprocessError(
160-
code: .init(.asyncIOFailed("_beginthreadex failed")),
161-
underlyingError: .init(rawValue: capturedError)
149+
code: .init(.asyncIOFailed("Failed to create monitor thread")),
150+
underlyingError: underlyingError
162151
)
163152
self.monitorThread = .failure(error)
164153
return

Sources/Subprocess/Platforms/Subprocess+Darwin.swift

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
158158

159159
// MARK: - Spawn
160160
extension Configuration {
161+
// @unchecked Sendable because we need to capture UnsafePointers
162+
// to send to another thread. While UnsafePointers are not
163+
// Sendable, we are not mutating them -- we only need these type
164+
// for C interface.
165+
internal struct SpawnContext: @unchecked Sendable {
166+
let fileActions: posix_spawn_file_actions_t?
167+
let spawnAttributes: posix_spawnattr_t?
168+
let argv: [UnsafeMutablePointer<CChar>?]
169+
let env: [UnsafeMutablePointer<CChar>?]
170+
let uidPtr: UnsafeMutablePointer<uid_t>?
171+
let gidPtr: UnsafeMutablePointer<gid_t>?
172+
}
173+
161174
internal func spawn(
162175
withInput inputPipe: consuming CreatedPipe,
163176
outputPipe: consuming CreatedPipe,
164177
errorPipe: consuming CreatedPipe
165-
) throws -> SpawnResult {
178+
) async throws -> SpawnResult {
166179
// Instead of checking if every possible executable path
167180
// is valid, spawn each directly and catch ENOENT
168181
let possiblePaths = self.executable.possibleExecutablePaths(
@@ -172,7 +185,7 @@ extension Configuration {
172185
var outputPipeBox: CreatedPipe? = consume outputPipe
173186
var errorPipeBox: CreatedPipe? = consume errorPipe
174187

175-
return try self.preSpawn { args throws -> SpawnResult in
188+
return try await self.preSpawn { args throws -> SpawnResult in
176189
let (env, uidPtr, gidPtr, supplementaryGroups) = args
177190
var _inputPipe = inputPipeBox.take()!
178191
var _outputPipe = outputPipeBox.take()!
@@ -186,8 +199,6 @@ extension Configuration {
186199
let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor()
187200

188201
for possibleExecutablePath in possiblePaths {
189-
var pid: pid_t = 0
190-
191202
// Setup Arguments
192203
let argv: [UnsafeMutablePointer<CChar>?] = self.arguments.createArgs(
193204
withExecutablePath: possibleExecutablePath
@@ -394,21 +405,35 @@ extension Configuration {
394405
}
395406

396407
// Spawn
397-
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
398-
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
399-
return _subprocess_spawn(
400-
&pid,
401-
exePath,
402-
&fileActions,
403-
&spawnAttributes,
404-
argv,
405-
env,
406-
uidPtr,
407-
gidPtr,
408-
Int32(supplementaryGroups?.count ?? 0),
409-
sgroups?.baseAddress,
410-
self.platformOptions.createSession ? 1 : 0
411-
)
408+
let spawnContext = SpawnContext(
409+
fileActions: fileActions,
410+
spawnAttributes: spawnAttributes,
411+
argv: argv,
412+
env: env,
413+
uidPtr: uidPtr,
414+
gidPtr: gidPtr
415+
)
416+
let (spawnError, pid) = try await runOnBackgroundThread {
417+
return possibleExecutablePath.withCString { exePath in
418+
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
419+
var pid: pid_t = 0
420+
var _fileActions = spawnContext.fileActions
421+
var _spawnAttributes = spawnContext.spawnAttributes
422+
let rc = _subprocess_spawn(
423+
&pid,
424+
exePath,
425+
&_fileActions,
426+
&_spawnAttributes,
427+
spawnContext.argv,
428+
spawnContext.env,
429+
spawnContext.uidPtr,
430+
spawnContext.gidPtr,
431+
Int32(supplementaryGroups?.count ?? 0),
432+
sgroups?.baseAddress,
433+
self.platformOptions.createSession ? 1 : 0
434+
)
435+
return (rc, pid)
436+
}
412437
}
413438
}
414439
// Spawn error

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,9 @@ private func monitorThreadFunc(context: MonitorThreadContext) {
225225
repeating: epoll_event(events: 0, data: epoll_data(fd: 0)),
226226
count: 256
227227
)
228-
var waitMask = sigset_t();
229-
sigemptyset(&waitMask);
230-
sigaddset(&waitMask, SIGCHLD);
228+
var waitMask = sigset_t()
229+
sigemptyset(&waitMask)
230+
sigaddset(&waitMask, SIGCHLD)
231231
// Enter the monitor loop
232232
monitorLoop: while true {
233233
let eventCount = epoll_pwait(

Sources/Subprocess/Platforms/Subprocess+Unix.swift

Lines changed: 46 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ extension Configuration {
350350
)
351351

352352
internal func preSpawn<Result: ~Copyable>(
353-
_ work: (PreSpawnArgs) throws -> Result
354-
) throws -> Result {
353+
_ work: (PreSpawnArgs) async throws -> Result
354+
) async throws -> Result {
355355
// Prepare environment
356356
let env = self.environment.createEnv()
357357
defer {
@@ -378,7 +378,7 @@ extension Configuration {
378378
if let groupsValue = self.platformOptions.supplementaryGroups {
379379
supplementaryGroups = groupsValue
380380
}
381-
return try work(
381+
return try await work(
382382
(
383383
env: env,
384384
uidPtr: uidPtr,
@@ -415,11 +415,24 @@ internal typealias PlatformFileDescriptor = CInt
415415

416416
#if !canImport(Darwin)
417417
extension Configuration {
418+
419+
// @unchecked Sendable because we need to capture UnsafePointers
420+
// to send to another thread. While UnsafePointers are not
421+
// Sendable, we are not mutating them -- we only need these type
422+
// for C interface.
423+
internal struct SpawnContext: @unchecked Sendable {
424+
let argv: [UnsafeMutablePointer<CChar>?]
425+
let env: [UnsafeMutablePointer<CChar>?]
426+
let uidPtr: UnsafeMutablePointer<uid_t>?
427+
let gidPtr: UnsafeMutablePointer<gid_t>?
428+
let processGroupIDPtr: UnsafeMutablePointer<gid_t>?
429+
}
430+
418431
internal func spawn(
419432
withInput inputPipe: consuming CreatedPipe,
420433
outputPipe: consuming CreatedPipe,
421434
errorPipe: consuming CreatedPipe
422-
) throws -> SpawnResult {
435+
) async throws -> SpawnResult {
423436
// Ensure the waiter thread is running.
424437
#if os(Linux) || os(Android)
425438
_setupMonitorSignalHandler()
@@ -434,7 +447,7 @@ extension Configuration {
434447
var outputPipeBox: CreatedPipe? = consume outputPipe
435448
var errorPipeBox: CreatedPipe? = consume errorPipe
436449

437-
return try self.preSpawn { args throws -> SpawnResult in
450+
return try await self.preSpawn { args throws -> SpawnResult in
438451
let (env, uidPtr, gidPtr, supplementaryGroups) = args
439452

440453
var _inputPipe = inputPipeBox.take()!
@@ -472,27 +485,34 @@ extension Configuration {
472485
]
473486

474487
// Spawn
475-
var pid: pid_t = 0
476-
var processDescriptor: PlatformFileDescriptor = -1
477-
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
478-
return (self.workingDirectory?.string).withOptionalCString { workingDir in
479-
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
480-
return fileDescriptors.withUnsafeBufferPointer { fds in
481-
return _subprocess_fork_exec(
482-
&pid,
483-
&processDescriptor,
484-
exePath,
485-
workingDir,
486-
fds.baseAddress!,
487-
argv,
488-
env,
489-
uidPtr,
490-
gidPtr,
491-
processGroupIDPtr,
492-
CInt(supplementaryGroups?.count ?? 0),
493-
sgroups?.baseAddress,
494-
self.platformOptions.createSession ? 1 : 0
495-
)
488+
let spawnContext = SpawnContext(
489+
argv: argv, env: env, uidPtr: uidPtr, gidPtr: gidPtr, processGroupIDPtr: processGroupIDPtr
490+
)
491+
let (pid, processDescriptor, spawnError) = try await runOnBackgroundThread {
492+
return possibleExecutablePath.withCString { exePath in
493+
return (self.workingDirectory?.string).withOptionalCString { workingDir in
494+
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
495+
return fileDescriptors.withUnsafeBufferPointer { fds in
496+
var pid: pid_t = 0
497+
var processDescriptor: PlatformFileDescriptor = -1
498+
499+
let rc = _subprocess_fork_exec(
500+
&pid,
501+
&processDescriptor,
502+
exePath,
503+
workingDir,
504+
fds.baseAddress!,
505+
spawnContext.argv,
506+
spawnContext.env,
507+
spawnContext.uidPtr,
508+
spawnContext.gidPtr,
509+
spawnContext.processGroupIDPtr,
510+
CInt(supplementaryGroups?.count ?? 0),
511+
sgroups?.baseAddress,
512+
self.platformOptions.createSession ? 1 : 0
513+
)
514+
return (pid, processDescriptor, rc)
515+
}
496516
}
497517
}
498518
}
@@ -658,51 +678,6 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
658678
return self.description(withIndent: 0)
659679
}
660680
}
661-
662-
// Special keys used in Error's user dictionary
663-
extension String {
664-
static let debugDescriptionErrorKey = "DebugDescription"
665-
}
666-
667-
internal func pthread_create(_ body: @Sendable @escaping () -> ()) throws(SubprocessError.UnderlyingError) -> pthread_t {
668-
final class Context {
669-
let body: @Sendable () -> ()
670-
init(body: @Sendable @escaping () -> Void) {
671-
self.body = body
672-
}
673-
}
674-
#if canImport(Glibc) || canImport(Musl)
675-
func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? {
676-
(Unmanaged<AnyObject>.fromOpaque(context!).takeRetainedValue() as! Context).body()
677-
return nil
678-
}
679-
#elseif canImport(Bionic)
680-
func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
681-
(Unmanaged<AnyObject>.fromOpaque(context).takeRetainedValue() as! Context).body()
682-
return context
683-
}
684-
#endif
685-
#if (os(Linux) && canImport(Glibc)) || canImport(Bionic)
686-
var thread = pthread_t()
687-
#else
688-
var thread: pthread_t?
689-
#endif
690-
let rc = pthread_create(
691-
&thread,
692-
nil,
693-
proc,
694-
Unmanaged.passRetained(Context(body: body)).toOpaque()
695-
)
696-
if rc != 0 {
697-
throw SubprocessError.UnderlyingError(rawValue: rc)
698-
}
699-
#if (os(Linux) && canImport(Glibc)) || canImport(Bionic)
700-
return thread
701-
#else
702-
return thread!
703-
#endif
704-
}
705-
706681
#endif // !canImport(Darwin)
707682

708683
extension ProcessIdentifier {

0 commit comments

Comments
 (0)