Skip to content

Commit 87269bf

Browse files
committed
Call spawn() on a background thread because it might block
Introduce runOnBackgroundThread() to run closures on a background thread without blocking the Swift Concurrency thread pool
1 parent 3cc99d0 commit 87269bf

File tree

9 files changed

+488
-165
lines changed

9 files changed

+488
-165
lines changed

Sources/Subprocess/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ add_library(Subprocess
1414
Buffer.swift
1515
Error.swift
1616
Teardown.swift
17+
Thread.swift
1718
Result.swift
1819
IO/Output.swift
1920
IO/Input.swift

Sources/Subprocess/Configuration.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public struct Configuration: Sendable {
6868
isolation: isolated (any Actor)? = #isolation,
6969
_ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result)
7070
) async throws -> ExecutionResult<Result> {
71-
let spawnResults = try self.spawn(
71+
let spawnResults = try await self.spawn(
7272
withInput: input,
7373
outputPipe: output,
7474
errorPipe: error

Sources/Subprocess/IO/AsyncIO+Windows.swift

Lines changed: 67 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ final class AsyncIO: @unchecked Sendable {
4040
) throws -> ResultType) rethrows -> ResultType
4141
}
4242

43-
private final class MonitorThreadContext {
43+
private struct MonitorThreadContext: @unchecked Sendable {
4444
let ioCompletionPort: HANDLE
4545

4646
init(ioCompletionPort: HANDLE) {
@@ -57,9 +57,9 @@ final class AsyncIO: @unchecked Sendable {
5757
internal init() {
5858
var maybeSetupError: SubprocessError? = nil
5959
// Create the the completion port
60-
guard let port = CreateIoCompletionPort(
60+
guard let ioCompletionPort = CreateIoCompletionPort(
6161
INVALID_HANDLE_VALUE, nil, 0, 0
62-
), port != INVALID_HANDLE_VALUE else {
62+
), ioCompletionPort != INVALID_HANDLE_VALUE else {
6363
let error = SubprocessError(
6464
code: .init(.asyncIOFailed("CreateIoCompletionPort failed")),
6565
underlyingError: .init(rawValue: GetLastError())
@@ -68,89 +68,81 @@ final class AsyncIO: @unchecked Sendable {
6868
self.monitorThread = .failure(error)
6969
return
7070
}
71-
self.ioCompletionPort = .success(port)
71+
self.ioCompletionPort = .success(ioCompletionPort)
7272
// Create monitor thread
73-
let threadContext = MonitorThreadContext(ioCompletionPort: port)
74-
let threadContextPtr = Unmanaged.passRetained(threadContext)
75-
/// Microsoft documentation for `CreateThread` states:
76-
/// > A thread in an executable that calls the C run-time library (CRT)
77-
/// > should use the _beginthreadex and _endthreadex functions for
78-
/// > thread management rather than CreateThread and ExitThread
79-
let threadHandleValue = _beginthreadex(nil, 0, { args in
80-
func reportError(_ error: SubprocessError) {
81-
let continuations = _registration.withLock { store in
82-
return store.values
83-
}
84-
for continuation in continuations {
85-
continuation.finish(throwing: error)
73+
let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort)
74+
let threadHandle: HANDLE
75+
do {
76+
threadHandle = try begin_thread_x {
77+
func reportError(_ error: SubprocessError) {
78+
let continuations = _registration.withLock { store in
79+
return store.values
80+
}
81+
for continuation in continuations {
82+
continuation.finish(throwing: error)
83+
}
8684
}
87-
}
8885

89-
let unmanaged = Unmanaged<MonitorThreadContext>.fromOpaque(args!)
90-
let context = unmanaged.takeRetainedValue()
91-
92-
// Monitor loop
93-
while true {
94-
var bytesTransferred: DWORD = 0
95-
var targetFileDescriptor: UInt64 = 0
96-
var overlapped: LPOVERLAPPED? = nil
97-
98-
let monitorResult = GetQueuedCompletionStatus(
99-
context.ioCompletionPort,
100-
&bytesTransferred,
101-
&targetFileDescriptor,
102-
&overlapped,
103-
INFINITE
104-
)
105-
if !monitorResult {
106-
let lastError = GetLastError()
107-
if lastError == ERROR_BROKEN_PIPE {
108-
// We finished reading the handle. Signal EOF by
109-
// finishing the stream.
110-
// NOTE: here we deliberately leave now unused continuation
111-
// in the store. Windows does not offer an API to remove a
112-
// HANDLE from an IOCP port, therefore we leave the registration
113-
// to signify the HANDLE has already been resisted.
114-
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
115-
if let continuation = store[targetFileDescriptor] {
116-
return continuation
86+
// Monitor loop
87+
while true {
88+
var bytesTransferred: DWORD = 0
89+
var targetFileDescriptor: UInt64 = 0
90+
var overlapped: LPOVERLAPPED? = nil
91+
92+
let monitorResult = GetQueuedCompletionStatus(
93+
context.ioCompletionPort,
94+
&bytesTransferred,
95+
&targetFileDescriptor,
96+
&overlapped,
97+
INFINITE
98+
)
99+
if !monitorResult {
100+
let lastError = GetLastError()
101+
if lastError == ERROR_BROKEN_PIPE {
102+
// We finished reading the handle. Signal EOF by
103+
// finishing the stream.
104+
// NOTE: here we deliberately leave now unused continuation
105+
// in the store. Windows does not offer an API to remove a
106+
// HANDLE from an IOCP port, therefore we leave the registration
107+
// to signify the HANDLE has already been resisted.
108+
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
109+
if let continuation = store[targetFileDescriptor] {
110+
return continuation
111+
}
112+
return nil
117113
}
118-
return nil
114+
continuation?.finish()
115+
continue
116+
} else {
117+
let error = SubprocessError(
118+
code: .init(.asyncIOFailed("GetQueuedCompletionStatus failed")),
119+
underlyingError: .init(rawValue: lastError)
120+
)
121+
reportError(error)
122+
break
119123
}
120-
continuation?.finish()
121-
continue
122-
} else {
123-
let error = SubprocessError(
124-
code: .init(.asyncIOFailed("GetQueuedCompletionStatus failed")),
125-
underlyingError: .init(rawValue: lastError)
126-
)
127-
reportError(error)
128-
break
129124
}
130-
}
131125

132-
// Breakout the monitor loop if we received shutdown from the shutdownFD
133-
if targetFileDescriptor == shutdownPort {
134-
break
135-
}
136-
// Notify the continuations
137-
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
138-
if let continuation = store[targetFileDescriptor] {
139-
return continuation
126+
// Breakout the monitor loop if we received shutdown from the shutdownFD
127+
if targetFileDescriptor == shutdownPort {
128+
break
140129
}
141-
return nil
130+
// Notify the continuations
131+
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
132+
if let continuation = store[targetFileDescriptor] {
133+
return continuation
134+
}
135+
return nil
136+
}
137+
continuation?.yield(bytesTransferred)
142138
}
143-
continuation?.yield(bytesTransferred)
139+
140+
return 0
144141
}
145-
return 0
146-
}, threadContextPtr.toOpaque(), 0, nil)
147-
guard threadHandleValue > 0,
148-
let threadHandle = HANDLE(bitPattern: threadHandleValue) else {
149-
// _beginthreadex uses errno instead of GetLastError()
150-
let capturedError = _subprocess_windows_get_errno()
142+
} catch let underlyingError {
151143
let error = SubprocessError(
152-
code: .init(.asyncIOFailed("_beginthreadex failed")),
153-
underlyingError: .init(rawValue: capturedError)
144+
code: .init(.asyncIOFailed("Failed to create monitor thread")),
145+
underlyingError: underlyingError
154146
)
155147
self.monitorThread = .failure(error)
156148
return

Sources/Subprocess/Platforms/Subprocess+Darwin.swift

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
177177

178178
// MARK: - Spawn
179179
extension Configuration {
180+
// @unchecked Sendable because we need to capture UnsafePointers
181+
// to send to another thread. While UnsafePointers are not
182+
// Sendable, we are not mutating them -- we only need these type
183+
// for C interface.
184+
internal struct SpawnContext: @unchecked Sendable {
185+
let fileActions: posix_spawn_file_actions_t?
186+
let spawnAttributes: posix_spawnattr_t?
187+
let argv: [UnsafeMutablePointer<CChar>?]
188+
let env: [UnsafeMutablePointer<CChar>?]
189+
let uidPtr: UnsafeMutablePointer<uid_t>?
190+
let gidPtr: UnsafeMutablePointer<gid_t>?
191+
}
192+
180193
internal func spawn(
181194
withInput inputPipe: consuming CreatedPipe,
182195
outputPipe: consuming CreatedPipe,
183196
errorPipe: consuming CreatedPipe
184-
) throws -> SpawnResult {
197+
) async throws -> SpawnResult {
185198
// Instead of checking if every possible executable path
186199
// is valid, spawn each directly and catch ENOENT
187200
let possiblePaths = self.executable.possibleExecutablePaths(
@@ -191,7 +204,7 @@ extension Configuration {
191204
var outputPipeBox: CreatedPipe? = consume outputPipe
192205
var errorPipeBox: CreatedPipe? = consume errorPipe
193206

194-
return try self.preSpawn { args throws -> SpawnResult in
207+
return try await self.preSpawn { args throws -> SpawnResult in
195208
let (env, uidPtr, gidPtr, supplementaryGroups) = args
196209
var _inputPipe = inputPipeBox.take()!
197210
var _outputPipe = outputPipeBox.take()!
@@ -205,8 +218,6 @@ extension Configuration {
205218
let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor()
206219

207220
for possibleExecutablePath in possiblePaths {
208-
var pid: pid_t = 0
209-
210221
// Setup Arguments
211222
let argv: [UnsafeMutablePointer<CChar>?] = self.arguments.createArgs(
212223
withExecutablePath: possibleExecutablePath
@@ -413,22 +424,36 @@ extension Configuration {
413424
}
414425

415426
// Spawn
416-
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
417-
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
418-
return _subprocess_spawn(
419-
&pid,
420-
exePath,
421-
&fileActions,
422-
&spawnAttributes,
423-
argv,
424-
env,
425-
uidPtr,
426-
gidPtr,
427-
Int32(supplementaryGroups?.count ?? 0),
428-
sgroups?.baseAddress,
429-
self.platformOptions.createSession ? 1 : 0,
430-
self.platformOptions.preExecProcessAction,
431-
)
427+
let spawnContext = SpawnContext(
428+
fileActions: fileActions,
429+
spawnAttributes: spawnAttributes,
430+
argv: argv,
431+
env: env,
432+
uidPtr: uidPtr,
433+
gidPtr: gidPtr
434+
)
435+
let (spawnError, pid) = try await runOnBackgroundThread {
436+
return possibleExecutablePath.withCString { exePath in
437+
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
438+
var pid: pid_t = 0
439+
var _fileActions = spawnContext.fileActions
440+
var _spawnAttributes = spawnContext.spawnAttributes
441+
let rc = _subprocess_spawn(
442+
&pid,
443+
exePath,
444+
&_fileActions,
445+
&_spawnAttributes,
446+
spawnContext.argv,
447+
spawnContext.env,
448+
spawnContext.uidPtr,
449+
spawnContext.gidPtr,
450+
Int32(supplementaryGroups?.count ?? 0),
451+
sgroups?.baseAddress,
452+
self.platformOptions.createSession ? 1 : 0,
453+
self.platformOptions.preExecProcessAction,
454+
)
455+
return (rc, pid)
456+
}
432457
}
433458
}
434459
// Spawn error

0 commit comments

Comments
 (0)