Skip to content

Commit ca05b23

Browse files
Merge pull request #83 from swiftlang/eng/PR-term
Process termination monitoring implementation on Linux conflicts with processes spawned by other means
2 parents 4211d5f + 7faeeb8 commit ca05b23

File tree

1 file changed

+98
-70
lines changed

1 file changed

+98
-70
lines changed

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 98 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ extension Configuration {
3737
outputPipe: consuming CreatedPipe,
3838
errorPipe: consuming CreatedPipe
3939
) throws -> SpawnResult {
40+
// Ensure the waiter thread is running.
4041
_setupMonitorSignalHandler()
4142

4243
// Instead of checking if every possible executable path
@@ -266,32 +267,53 @@ extension String {
266267
internal func monitorProcessTermination(
267268
forProcessWithIdentifier pid: ProcessIdentifier
268269
) async throws -> TerminationStatus {
269-
return try await withCheckedThrowingContinuation { continuation in
270+
try await withCheckedThrowingContinuation { continuation in
270271
_childProcessContinuations.withLock { continuations in
271-
if let existing = continuations.removeValue(forKey: pid.value),
272-
case .status(let existingStatus) = existing
273-
{
274-
// We already have existing status to report
275-
continuation.resume(returning: existingStatus)
276-
} else {
277-
// Save the continuation for handler
278-
continuations[pid.value] = .continuation(continuation)
279-
}
272+
// We don't need to worry about a race condition here because waitid()
273+
// does not clear the wait/zombie state of the child process. If it sees
274+
// the child process has terminated and manages to acquire the lock before
275+
// we add this continuation to the dictionary, then it will simply loop
276+
// and report the status again.
277+
let oldContinuation = continuations.updateValue(continuation, forKey: pid.value)
278+
precondition(oldContinuation == nil)
279+
280+
// Wake up the waiter thread if it is waiting for more child processes.
281+
_ = pthread_cond_signal(_waitThreadNoChildrenCondition)
280282
}
281283
}
282284
}
283285

284-
private enum ContinuationOrStatus {
285-
case continuation(CheckedContinuation<TerminationStatus, any Error>)
286-
case status(TerminationStatus)
286+
// Small helper to provide thread-safe access to the child process to continuations map as well as a condition variable to suspend the calling thread when there are no subprocesses to wait for. Note that Mutex cannot be used here because we need the semantics of pthread_cond_wait, which requires passing the pthread_mutex_t instance as a parameter, something the Mutex API does not provide access to.
287+
private final class ChildProcessContinuations: Sendable {
288+
private nonisolated(unsafe) var continuations = [pid_t: CheckedContinuation<TerminationStatus, any Error>]()
289+
private nonisolated(unsafe) let mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
290+
291+
init() {
292+
pthread_mutex_init(mutex, nil)
293+
}
294+
295+
func withLock<R>(_ body: (inout [pid_t: CheckedContinuation<TerminationStatus, any Error>]) throws -> R) rethrows -> R {
296+
try withUnsafeUnderlyingLock { _, continuations in
297+
try body(&continuations)
298+
}
299+
}
300+
301+
func withUnsafeUnderlyingLock<R>(_ body: (UnsafeMutablePointer<pthread_mutex_t>, inout [pid_t: CheckedContinuation<TerminationStatus, any Error>]) throws -> R) rethrows -> R {
302+
pthread_mutex_lock(mutex)
303+
defer {
304+
pthread_mutex_unlock(mutex)
305+
}
306+
return try body(mutex, &continuations)
307+
}
287308
}
288309

289-
private let _childProcessContinuations:
290-
Mutex<
291-
[pid_t: ContinuationOrStatus]
292-
> = Mutex([:])
310+
private let _childProcessContinuations = ChildProcessContinuations()
293311

294-
private let signalSource: SendableSourceSignal = SendableSourceSignal()
312+
private nonisolated(unsafe) let _waitThreadNoChildrenCondition = {
313+
let result = UnsafeMutablePointer<pthread_cond_t>.allocate(capacity: 1)
314+
_ = pthread_cond_init(result, nil)
315+
return result
316+
}()
295317

296318
private extension siginfo_t {
297319
var si_status: Int32 {
@@ -316,64 +338,70 @@ private extension siginfo_t {
316338
}
317339

318340
private let setup: () = {
319-
signalSource.setEventHandler {
320-
while true {
321-
var siginfo = siginfo_t()
322-
guard waitid(P_ALL, id_t(0), &siginfo, WEXITED) == 0 || errno == EINTR else {
323-
return
324-
}
325-
var status: TerminationStatus? = nil
326-
switch siginfo.si_code {
327-
case .init(CLD_EXITED):
328-
status = .exited(siginfo.si_status)
329-
case .init(CLD_KILLED), .init(CLD_DUMPED):
330-
status = .unhandledException(siginfo.si_status)
331-
case .init(CLD_TRAPPED), .init(CLD_STOPPED), .init(CLD_CONTINUED):
332-
// Ignore these signals because they are not related to
333-
// process exiting
334-
break
335-
default:
336-
fatalError("Unexpected exit status: \(siginfo.si_code)")
337-
}
338-
if let status = status {
339-
_childProcessContinuations.withLock { continuations in
341+
// Create the thread. It will run immediately; because it runs in an infinite
342+
// loop, we aren't worried about detaching or joining it.
343+
var thread = pthread_t()
344+
_ = pthread_create(
345+
&thread,
346+
nil,
347+
{ _ -> UnsafeMutableRawPointer? in
348+
// Run an infinite loop that waits for child processes to terminate and
349+
// captures their exit statuses.
350+
while true {
351+
// Listen for child process exit events. WNOWAIT means we don't perturb the
352+
// state of a terminated (zombie) child process, allowing us to fetch the
353+
// continuation (if available) before reaping.
354+
var siginfo = siginfo_t()
355+
errno = 0
356+
if waitid(P_ALL, id_t(0), &siginfo, WEXITED | WNOWAIT) == 0 {
340357
let pid = siginfo.si_pid
341-
if let existing = continuations.removeValue(forKey: pid),
342-
case .continuation(let c) = existing
343-
{
344-
c.resume(returning: status)
345-
} else {
346-
// We don't have continuation yet, just state status
347-
continuations[pid] = .status(status)
358+
359+
// If we had a continuation for this PID, allow the process to be reaped
360+
// and pass the resulting exit condition back to the calling task. If
361+
// there is no continuation, then either it hasn't been stored yet or
362+
// this child process is not tracked by the waiter thread.
363+
guard pid != 0, let c = _childProcessContinuations.withLock({ $0.removeValue(forKey: pid) }) else {
364+
continue
365+
}
366+
367+
c.resume(with: Result {
368+
// Here waitid should not block because `pid` has already terminated at this point.
369+
while true {
370+
var siginfo = siginfo_t()
371+
errno = 0
372+
if waitid(P_PID, numericCast(pid), &siginfo, WEXITED) == 0 {
373+
var status: TerminationStatus? = nil
374+
switch siginfo.si_code {
375+
case .init(CLD_EXITED):
376+
return .exited(siginfo.si_status)
377+
case .init(CLD_KILLED), .init(CLD_DUMPED):
378+
return .unhandledException(siginfo.si_status)
379+
default:
380+
fatalError("Unexpected exit status: \(siginfo.si_code)")
381+
}
382+
} else if errno != EINTR {
383+
throw SubprocessError.UnderlyingError(rawValue: errno)
384+
}
385+
}
386+
})
387+
} else if errno == ECHILD {
388+
// We got ECHILD. If there are no continuations added right now, we should
389+
// suspend this thread on the no-children condition until it's awoken by a
390+
// newly-scheduled waiter process. (If this condition is spuriously
391+
// woken, we'll just loop again, which is fine.) Note that we read errno
392+
// outside the lock in case acquiring the lock perturbs it.
393+
_childProcessContinuations.withUnsafeUnderlyingLock { lock, childProcessContinuations in
394+
if childProcessContinuations.isEmpty {
395+
_ = pthread_cond_wait(_waitThreadNoChildrenCondition, lock)
396+
}
348397
}
349398
}
350399
}
351-
}
352-
}
353-
signalSource.resume()
400+
},
401+
nil
402+
)
354403
}()
355404

356-
/// Unchecked Sendable here since this class is only explicitly
357-
/// initialized once during the lifetime of the process
358-
final class SendableSourceSignal: @unchecked Sendable {
359-
private let signalSource: DispatchSourceSignal
360-
361-
func setEventHandler(handler: @escaping DispatchSourceHandler) {
362-
self.signalSource.setEventHandler(handler: handler)
363-
}
364-
365-
func resume() {
366-
self.signalSource.resume()
367-
}
368-
369-
init() {
370-
self.signalSource = DispatchSource.makeSignalSource(
371-
signal: SIGCHLD,
372-
queue: .global()
373-
)
374-
}
375-
}
376-
377405
private func _setupMonitorSignalHandler() {
378406
// Only executed once
379407
setup

0 commit comments

Comments
 (0)