Skip to content

Commit 63ddb46

Browse files
committed
Call spawn() on a background thread because it might block
Introduce runOnBackgroundThread() to run closures on a background thread without blocking the Swift Concurrency thread pool
1 parent 680621b commit 63ddb46

File tree

7 files changed

+428
-52
lines changed

7 files changed

+428
-52
lines changed

Sources/Subprocess/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ add_library(Subprocess
1414
Buffer.swift
1515
Error.swift
1616
Teardown.swift
17+
Thread.swift
1718
Result.swift
1819
IO/Output.swift
1920
IO/Input.swift

Sources/Subprocess/Configuration.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public struct Configuration: Sendable {
8484
isolation: isolated (any Actor)? = #isolation,
8585
_ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result)
8686
) async throws -> ExecutionResult<Result> {
87-
let spawnResults = try self.spawn(
87+
let spawnResults = try await self.spawn(
8888
withInput: input,
8989
outputPipe: output,
9090
errorPipe: error

Sources/Subprocess/IO/AsyncIO+Windows.swift

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ final class AsyncIO: @unchecked Sendable {
4343
) rethrows -> ResultType
4444
}
4545

46-
private final class MonitorThreadContext {
46+
private struct MonitorThreadContext: @unchecked Sendable {
4747
let ioCompletionPort: HANDLE
4848

4949
init(ioCompletionPort: HANDLE) {
@@ -60,11 +60,9 @@ final class AsyncIO: @unchecked Sendable {
6060
internal init() {
6161
var maybeSetupError: SubprocessError? = nil
6262
// Create the the completion port
63-
guard
64-
let port = CreateIoCompletionPort(
65-
INVALID_HANDLE_VALUE, nil, 0, 0
66-
), port != INVALID_HANDLE_VALUE
67-
else {
63+
guard let ioCompletionPort = CreateIoCompletionPort(
64+
INVALID_HANDLE_VALUE, nil, 0, 0
65+
), ioCompletionPort != INVALID_HANDLE_VALUE else {
6866
let error = SubprocessError(
6967
code: .init(.asyncIOFailed("CreateIoCompletionPort failed")),
7068
underlyingError: .init(rawValue: GetLastError())
@@ -73,17 +71,12 @@ final class AsyncIO: @unchecked Sendable {
7371
self.monitorThread = .failure(error)
7472
return
7573
}
76-
self.ioCompletionPort = .success(port)
74+
self.ioCompletionPort = .success(ioCompletionPort)
7775
// Create monitor thread
78-
let threadContext = MonitorThreadContext(ioCompletionPort: port)
79-
let threadContextPtr = Unmanaged.passRetained(threadContext)
80-
/// Microsoft documentation for `CreateThread` states:
81-
/// > A thread in an executable that calls the C run-time library (CRT)
82-
/// > should use the _beginthreadex and _endthreadex functions for
83-
/// > thread management rather than CreateThread and ExitThread
84-
let threadHandleValue = _beginthreadex(
85-
nil, 0,
86-
{ args in
76+
let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort)
77+
let threadHandle: HANDLE
78+
do {
79+
threadHandle = try begin_thread_x {
8780
func reportError(_ error: SubprocessError) {
8881
let continuations = _registration.withLock { store in
8982
return store.values
@@ -93,15 +86,50 @@ final class AsyncIO: @unchecked Sendable {
9386
}
9487
}
9588

96-
let unmanaged = Unmanaged<MonitorThreadContext>.fromOpaque(args!)
97-
let context = unmanaged.takeRetainedValue()
98-
89+
// Monitor loop
90+
while true {
91+
var bytesTransferred: DWORD = 0
92+
var targetFileDescriptor: UInt64 = 0
93+
var overlapped: LPOVERLAPPED? = nil
9994
// Monitor loop
10095
while true {
10196
var bytesTransferred: DWORD = 0
10297
var targetFileDescriptor: UInt64 = 0
10398
var overlapped: LPOVERLAPPED? = nil
10499

100+
let monitorResult = GetQueuedCompletionStatus(
101+
context.ioCompletionPort,
102+
&bytesTransferred,
103+
&targetFileDescriptor,
104+
&overlapped,
105+
INFINITE
106+
)
107+
if !monitorResult {
108+
let lastError = GetLastError()
109+
if lastError == ERROR_BROKEN_PIPE {
110+
// We finished reading the handle. Signal EOF by
111+
// finishing the stream.
112+
// NOTE: here we deliberately leave now unused continuation
113+
// in the store. Windows does not offer an API to remove a
114+
// HANDLE from an IOCP port, therefore we leave the registration
115+
// to signify the HANDLE has already been resisted.
116+
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
117+
if let continuation = store[targetFileDescriptor] {
118+
return continuation
119+
}
120+
return nil
121+
}
122+
continuation?.finish()
123+
continue
124+
} else {
125+
let error = SubprocessError(
126+
code: .init(.asyncIOFailed("GetQueuedCompletionStatus failed")),
127+
underlyingError: .init(rawValue: lastError)
128+
)
129+
reportError(error)
130+
break
131+
}
132+
}
105133
let monitorResult = GetQueuedCompletionStatus(
106134
context.ioCompletionPort,
107135
&bytesTransferred,
@@ -149,16 +177,13 @@ final class AsyncIO: @unchecked Sendable {
149177
}
150178
continuation?.yield(bytesTransferred)
151179
}
180+
152181
return 0
153-
}, threadContextPtr.toOpaque(), 0, nil)
154-
guard threadHandleValue > 0,
155-
let threadHandle = HANDLE(bitPattern: threadHandleValue)
156-
else {
157-
// _beginthreadex uses errno instead of GetLastError()
158-
let capturedError = _subprocess_windows_get_errno()
182+
}
183+
} catch let underlyingError {
159184
let error = SubprocessError(
160-
code: .init(.asyncIOFailed("_beginthreadex failed")),
161-
underlyingError: .init(rawValue: capturedError)
185+
code: .init(.asyncIOFailed("Failed to create monitor thread")),
186+
underlyingError: underlyingError
162187
)
163188
self.monitorThread = .failure(error)
164189
return

Sources/Subprocess/Platforms/Subprocess+Darwin.swift

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
153153

154154
// MARK: - Spawn
155155
extension Configuration {
156+
// @unchecked Sendable because we need to capture UnsafePointers
157+
// to send to another thread. While UnsafePointers are not
158+
// Sendable, we are not mutating them -- we only need these type
159+
// for C interface.
160+
internal struct SpawnContext: @unchecked Sendable {
161+
let fileActions: posix_spawn_file_actions_t?
162+
let spawnAttributes: posix_spawnattr_t?
163+
let argv: [UnsafeMutablePointer<CChar>?]
164+
let env: [UnsafeMutablePointer<CChar>?]
165+
let uidPtr: UnsafeMutablePointer<uid_t>?
166+
let gidPtr: UnsafeMutablePointer<gid_t>?
167+
}
168+
156169
internal func spawn(
157170
withInput inputPipe: consuming CreatedPipe,
158171
outputPipe: consuming CreatedPipe,
159172
errorPipe: consuming CreatedPipe
160-
) throws -> SpawnResult {
173+
) async throws -> SpawnResult {
161174
// Instead of checking if every possible executable path
162175
// is valid, spawn each directly and catch ENOENT
163176
let possiblePaths = self.executable.possibleExecutablePaths(
@@ -167,7 +180,7 @@ extension Configuration {
167180
var outputPipeBox: CreatedPipe? = consume outputPipe
168181
var errorPipeBox: CreatedPipe? = consume errorPipe
169182

170-
return try self.preSpawn { args throws -> SpawnResult in
183+
return try await self.preSpawn { args throws -> SpawnResult in
171184
let (env, uidPtr, gidPtr, supplementaryGroups) = args
172185
var _inputPipe = inputPipeBox.take()!
173186
var _outputPipe = outputPipeBox.take()!
@@ -181,8 +194,6 @@ extension Configuration {
181194
let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor()
182195

183196
for possibleExecutablePath in possiblePaths {
184-
var pid: pid_t = 0
185-
186197
// Setup Arguments
187198
let argv: [UnsafeMutablePointer<CChar>?] = self.arguments.createArgs(
188199
withExecutablePath: possibleExecutablePath
@@ -389,21 +400,35 @@ extension Configuration {
389400
}
390401

391402
// Spawn
392-
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
393-
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
394-
return _subprocess_spawn(
395-
&pid,
396-
exePath,
397-
&fileActions,
398-
&spawnAttributes,
399-
argv,
400-
env,
401-
uidPtr,
402-
gidPtr,
403-
Int32(supplementaryGroups?.count ?? 0),
404-
sgroups?.baseAddress,
405-
self.platformOptions.createSession ? 1 : 0
406-
)
403+
let spawnContext = SpawnContext(
404+
fileActions: fileActions,
405+
spawnAttributes: spawnAttributes,
406+
argv: argv,
407+
env: env,
408+
uidPtr: uidPtr,
409+
gidPtr: gidPtr
410+
)
411+
let (spawnError, pid) = try await runOnBackgroundThread {
412+
return possibleExecutablePath.withCString { exePath in
413+
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
414+
var pid: pid_t = 0
415+
var _fileActions = spawnContext.fileActions
416+
var _spawnAttributes = spawnContext.spawnAttributes
417+
let rc = _subprocess_spawn(
418+
&pid,
419+
exePath,
420+
&_fileActions,
421+
&_spawnAttributes,
422+
spawnContext.argv,
423+
spawnContext.env,
424+
spawnContext.uidPtr,
425+
spawnContext.gidPtr,
426+
Int32(supplementaryGroups?.count ?? 0),
427+
sgroups?.baseAddress,
428+
self.platformOptions.createSession ? 1 : 0
429+
)
430+
return (rc, pid)
431+
}
407432
}
408433
}
409434
// Spawn error

Sources/Subprocess/Platforms/Subprocess+Unix.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ extension Configuration {
350350
)
351351

352352
internal func preSpawn<Result: ~Copyable>(
353-
_ work: (PreSpawnArgs) throws -> Result
354-
) throws -> Result {
353+
_ work: (PreSpawnArgs) async throws -> Result
354+
) async throws -> Result {
355355
// Prepare environment
356356
let env = self.environment.createEnv()
357357
defer {
@@ -378,7 +378,7 @@ extension Configuration {
378378
if let groupsValue = self.platformOptions.supplementaryGroups {
379379
supplementaryGroups = groupsValue
380380
}
381-
return try work(
381+
return try await work(
382382
(
383383
env: env,
384384
uidPtr: uidPtr,

0 commit comments

Comments
 (0)