Skip to content

Commit fd53084

Browse files
committed
Call spawn() on a background thread because it might block
Introduce runOnBackgroundThread() to run closures on a background thread without blocking the Swift Concurrency thread pool
1 parent 7dc6e54 commit fd53084

File tree

9 files changed

+486
-163
lines changed

9 files changed

+486
-163
lines changed

Sources/Subprocess/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ add_library(Subprocess
1414
Buffer.swift
1515
Error.swift
1616
Teardown.swift
17+
Thread.swift
1718
Result.swift
1819
IO/Output.swift
1920
IO/Input.swift

Sources/Subprocess/Configuration.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public struct Configuration: Sendable {
6868
isolation: isolated (any Actor)? = #isolation,
6969
_ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result)
7070
) async throws -> ExecutionResult<Result> {
71-
let spawnResults = try self.spawn(
71+
let spawnResults = try await self.spawn(
7272
withInput: input,
7373
outputPipe: output,
7474
errorPipe: error

Sources/Subprocess/IO/AsyncIO+Windows.swift

Lines changed: 67 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ final class AsyncIO: @unchecked Sendable {
4040
) throws -> ResultType) rethrows -> ResultType
4141
}
4242

43-
private final class MonitorThreadContext {
43+
private struct MonitorThreadContext: @unchecked Sendable {
4444
let ioCompletionPort: HANDLE
4545

4646
init(ioCompletionPort: HANDLE) {
@@ -57,9 +57,9 @@ final class AsyncIO: @unchecked Sendable {
5757
internal init() {
5858
var maybeSetupError: SubprocessError? = nil
5959
// Create the the completion port
60-
guard let port = CreateIoCompletionPort(
60+
guard let ioCompletionPort = CreateIoCompletionPort(
6161
INVALID_HANDLE_VALUE, nil, 0, 0
62-
), port != INVALID_HANDLE_VALUE else {
62+
), ioCompletionPort != INVALID_HANDLE_VALUE else {
6363
let error = SubprocessError(
6464
code: .init(.asyncIOFailed("CreateIoCompletionPort failed")),
6565
underlyingError: .init(rawValue: GetLastError())
@@ -68,89 +68,81 @@ final class AsyncIO: @unchecked Sendable {
6868
self.monitorThread = .failure(error)
6969
return
7070
}
71-
self.ioCompletionPort = .success(port)
71+
self.ioCompletionPort = .success(ioCompletionPort)
7272
// Create monitor thread
73-
let threadContext = MonitorThreadContext(ioCompletionPort: port)
74-
let threadContextPtr = Unmanaged.passRetained(threadContext)
75-
/// Microsoft documentation for `CreateThread` states:
76-
/// > A thread in an executable that calls the C run-time library (CRT)
77-
/// > should use the _beginthreadex and _endthreadex functions for
78-
/// > thread management rather than CreateThread and ExitThread
79-
let threadHandleValue = _beginthreadex(nil, 0, { args in
80-
func reportError(_ error: SubprocessError) {
81-
let continuations = _registration.withLock { store in
82-
return store.values
83-
}
84-
for continuation in continuations {
85-
continuation.finish(throwing: error)
73+
let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort)
74+
let threadHandle: HANDLE
75+
do {
76+
threadHandle = try begin_thread_x {
77+
func reportError(_ error: SubprocessError) {
78+
let continuations = _registration.withLock { store in
79+
return store.values
80+
}
81+
for continuation in continuations {
82+
continuation.finish(throwing: error)
83+
}
8684
}
87-
}
8885

89-
let unmanaged = Unmanaged<MonitorThreadContext>.fromOpaque(args!)
90-
let context = unmanaged.takeRetainedValue()
91-
92-
// Monitor loop
93-
while true {
94-
var bytesTransferred: DWORD = 0
95-
var targetFileDescriptor: UInt64 = 0
96-
var overlapped: LPOVERLAPPED? = nil
97-
98-
let monitorResult = GetQueuedCompletionStatus(
99-
context.ioCompletionPort,
100-
&bytesTransferred,
101-
&targetFileDescriptor,
102-
&overlapped,
103-
INFINITE
104-
)
105-
if !monitorResult {
106-
let lastError = GetLastError()
107-
if lastError == ERROR_BROKEN_PIPE {
108-
// We finished reading the handle. Signal EOF by
109-
// finishing the stream.
110-
// NOTE: here we deliberately leave now unused continuation
111-
// in the store. Windows does not offer an API to remove a
112-
// HANDLE from an IOCP port, therefore we leave the registration
113-
// to signify the HANDLE has already been resisted.
114-
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
115-
if let continuation = store[targetFileDescriptor] {
116-
return continuation
86+
// Monitor loop
87+
while true {
88+
var bytesTransferred: DWORD = 0
89+
var targetFileDescriptor: UInt64 = 0
90+
var overlapped: LPOVERLAPPED? = nil
91+
92+
let monitorResult = GetQueuedCompletionStatus(
93+
context.ioCompletionPort,
94+
&bytesTransferred,
95+
&targetFileDescriptor,
96+
&overlapped,
97+
INFINITE
98+
)
99+
if !monitorResult {
100+
let lastError = GetLastError()
101+
if lastError == ERROR_BROKEN_PIPE {
102+
// We finished reading the handle. Signal EOF by
103+
// finishing the stream.
104+
// NOTE: here we deliberately leave now unused continuation
105+
// in the store. Windows does not offer an API to remove a
106+
// HANDLE from an IOCP port, therefore we leave the registration
107+
// to signify the HANDLE has already been resisted.
108+
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
109+
if let continuation = store[targetFileDescriptor] {
110+
return continuation
111+
}
112+
return nil
117113
}
118-
return nil
114+
continuation?.finish()
115+
continue
116+
} else {
117+
let error = SubprocessError(
118+
code: .init(.asyncIOFailed("GetQueuedCompletionStatus failed")),
119+
underlyingError: .init(rawValue: lastError)
120+
)
121+
reportError(error)
122+
break
119123
}
120-
continuation?.finish()
121-
continue
122-
} else {
123-
let error = SubprocessError(
124-
code: .init(.asyncIOFailed("GetQueuedCompletionStatus failed")),
125-
underlyingError: .init(rawValue: lastError)
126-
)
127-
reportError(error)
128-
break
129124
}
130-
}
131125

132-
// Breakout the monitor loop if we received shutdown from the shutdownFD
133-
if targetFileDescriptor == shutdownPort {
134-
break
135-
}
136-
// Notify the continuations
137-
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
138-
if let continuation = store[targetFileDescriptor] {
139-
return continuation
126+
// Breakout the monitor loop if we received shutdown from the shutdownFD
127+
if targetFileDescriptor == shutdownPort {
128+
break
140129
}
141-
return nil
130+
// Notify the continuations
131+
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
132+
if let continuation = store[targetFileDescriptor] {
133+
return continuation
134+
}
135+
return nil
136+
}
137+
continuation?.yield(bytesTransferred)
142138
}
143-
continuation?.yield(bytesTransferred)
139+
140+
return 0
144141
}
145-
return 0
146-
}, threadContextPtr.toOpaque(), 0, nil)
147-
guard threadHandleValue > 0,
148-
let threadHandle = HANDLE(bitPattern: threadHandleValue) else {
149-
// _beginthreadex uses errno instead of GetLastError()
150-
let capturedError = _subprocess_windows_get_errno()
142+
} catch let underlyingError {
151143
let error = SubprocessError(
152-
code: .init(.asyncIOFailed("_beginthreadex failed")),
153-
underlyingError: .init(rawValue: capturedError)
144+
code: .init(.asyncIOFailed("Failed to create monitor thread")),
145+
underlyingError: underlyingError
154146
)
155147
self.monitorThread = .failure(error)
156148
return

Sources/Subprocess/Platforms/Subprocess+Darwin.swift

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
153153

154154
// MARK: - Spawn
155155
extension Configuration {
156+
// @unchecked Sendable because we need to capture UnsafePointers
157+
// to send to another thread. While UnsafePointers are not
158+
// Sendable, we are not mutating them -- we only need these type
159+
// for C interface.
160+
internal struct SpawnContext: @unchecked Sendable {
161+
let fileActions: posix_spawn_file_actions_t?
162+
let spawnAttributes: posix_spawnattr_t?
163+
let argv: [UnsafeMutablePointer<CChar>?]
164+
let env: [UnsafeMutablePointer<CChar>?]
165+
let uidPtr: UnsafeMutablePointer<uid_t>?
166+
let gidPtr: UnsafeMutablePointer<gid_t>?
167+
}
168+
156169
internal func spawn(
157170
withInput inputPipe: consuming CreatedPipe,
158171
outputPipe: consuming CreatedPipe,
159172
errorPipe: consuming CreatedPipe
160-
) throws -> SpawnResult {
173+
) async throws -> SpawnResult {
161174
// Instead of checking if every possible executable path
162175
// is valid, spawn each directly and catch ENOENT
163176
let possiblePaths = self.executable.possibleExecutablePaths(
@@ -167,7 +180,7 @@ extension Configuration {
167180
var outputPipeBox: CreatedPipe? = consume outputPipe
168181
var errorPipeBox: CreatedPipe? = consume errorPipe
169182

170-
return try self.preSpawn { args throws -> SpawnResult in
183+
return try await self.preSpawn { args throws -> SpawnResult in
171184
let (env, uidPtr, gidPtr, supplementaryGroups) = args
172185
var _inputPipe = inputPipeBox.take()!
173186
var _outputPipe = outputPipeBox.take()!
@@ -181,8 +194,6 @@ extension Configuration {
181194
let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor()
182195

183196
for possibleExecutablePath in possiblePaths {
184-
var pid: pid_t = 0
185-
186197
// Setup Arguments
187198
let argv: [UnsafeMutablePointer<CChar>?] = self.arguments.createArgs(
188199
withExecutablePath: possibleExecutablePath
@@ -389,21 +400,35 @@ extension Configuration {
389400
}
390401

391402
// Spawn
392-
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
393-
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
394-
return _subprocess_spawn(
395-
&pid,
396-
exePath,
397-
&fileActions,
398-
&spawnAttributes,
399-
argv,
400-
env,
401-
uidPtr,
402-
gidPtr,
403-
Int32(supplementaryGroups?.count ?? 0),
404-
sgroups?.baseAddress,
405-
self.platformOptions.createSession ? 1 : 0
406-
)
403+
let spawnContext = SpawnContext(
404+
fileActions: fileActions,
405+
spawnAttributes: spawnAttributes,
406+
argv: argv,
407+
env: env,
408+
uidPtr: uidPtr,
409+
gidPtr: gidPtr
410+
)
411+
let (spawnError, pid) = try await runOnBackgroundThread {
412+
return possibleExecutablePath.withCString { exePath in
413+
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
414+
var pid: pid_t = 0
415+
var _fileActions = spawnContext.fileActions
416+
var _spawnAttributes = spawnContext.spawnAttributes
417+
let rc = _subprocess_spawn(
418+
&pid,
419+
exePath,
420+
&_fileActions,
421+
&_spawnAttributes,
422+
spawnContext.argv,
423+
spawnContext.env,
424+
spawnContext.uidPtr,
425+
spawnContext.gidPtr,
426+
Int32(supplementaryGroups?.count ?? 0),
427+
sgroups?.baseAddress,
428+
self.platformOptions.createSession ? 1 : 0
429+
)
430+
return (rc, pid)
431+
}
407432
}
408433
}
409434
// Spawn error

0 commit comments

Comments
 (0)