@@ -266,32 +266,56 @@ extension String {
266
266
internal func monitorProcessTermination(
267
267
forProcessWithIdentifier pid: ProcessIdentifier
268
268
) async throws -> TerminationStatus {
269
+ // Ensure the waiter thread is running.
270
+ _setupMonitorSignalHandler ( )
271
+
269
272
return try await withCheckedThrowingContinuation { continuation in
270
273
_childProcessContinuations. withLock { continuations in
271
- if let existing = continuations. removeValue ( forKey: pid. value) ,
272
- case . status( let existingStatus) = existing
273
- {
274
- // We already have existing status to report
275
- continuation. resume ( returning: existingStatus)
276
- } else {
277
- // Save the continuation for handler
278
- continuations [ pid. value] = . continuation( continuation)
279
- }
274
+ // We don't need to worry about a race condition here because waitid()
275
+ // does not clear the wait/zombie state of the child process. If it sees
276
+ // the child process has terminated and manages to acquire the lock before
277
+ // we add this continuation to the dictionary, then it will simply loop
278
+ // and report the status again.
279
+ let oldContinuation = continuations. updateValue ( continuation, forKey: pid. value)
280
+ precondition ( oldContinuation == nil )
281
+
282
+ // Wake up the waiter thread if it is waiting for more child processes.
283
+ _ = pthread_cond_signal ( _waitThreadNoChildrenCondition)
280
284
}
281
285
}
282
286
}
283
287
284
- private enum ContinuationOrStatus {
285
- case continuation( CheckedContinuation < TerminationStatus , any Error > )
286
- case status( TerminationStatus )
288
+ // Small helper to provide thread-safe access to the child process to continuations map as well as a condition variable to suspend the calling thread when there are no subprocesses to wait for. Note that Mutex cannot be used here because we need the semantics of pthread_cond_wait, which requires passing the pthread_mutex_t instance as a parameter, something the Mutex API does not provide access to.
289
+ private final class ChildProcessContinuations : Sendable {
290
+ private nonisolated ( unsafe) var continuations = [ pid_t : CheckedContinuation < TerminationStatus , any Error > ] ( )
291
+ private nonisolated ( unsafe) let mutex = UnsafeMutablePointer< pthread_mutex_t> . allocate( capacity: 1 )
292
+
293
+ init ( ) {
294
+ pthread_mutex_init ( mutex, nil )
295
+ }
296
+
297
+ func withLock< R> ( _ body: ( inout [ pid_t : CheckedContinuation < TerminationStatus , any Error > ] ) throws -> R ) rethrows -> R {
298
+ try withUnsafeUnderlyingLock { _, continuations in
299
+ try body ( & continuations)
300
+ }
301
+ }
302
+
303
+ func withUnsafeUnderlyingLock< R> ( _ body: ( UnsafeMutablePointer < pthread_mutex_t > , inout [ pid_t : CheckedContinuation < TerminationStatus , any Error > ] ) throws -> R ) rethrows -> R {
304
+ pthread_mutex_lock ( mutex)
305
+ defer {
306
+ pthread_mutex_unlock ( mutex)
307
+ }
308
+ return try body ( mutex, & continuations)
309
+ }
287
310
}
288
311
289
- private let _childProcessContinuations :
290
- Mutex <
291
- [ pid_t : ContinuationOrStatus ]
292
- > = Mutex ( [ : ] )
312
+ private let _childProcessContinuations = ChildProcessContinuations ( )
293
313
294
- private let signalSource : SendableSourceSignal = SendableSourceSignal ( )
314
+ private nonisolated ( unsafe) let _waitThreadNoChildrenCondition = {
315
+ let result = UnsafeMutablePointer< pthread_cond_t> . allocate( capacity: 1 )
316
+ _ = pthread_cond_init ( result, nil )
317
+ return result
318
+ } ( )
295
319
296
320
private extension siginfo_t {
297
321
var si_status : Int32 {
@@ -316,64 +340,70 @@ private extension siginfo_t {
316
340
}
317
341
318
342
private let setup : ( ) = {
319
- signalSource. setEventHandler {
320
- while true {
321
- var siginfo = siginfo_t ( )
322
- guard waitid ( P_ALL, id_t ( 0 ) , & siginfo, WEXITED) == 0 || errno == EINTR else {
323
- return
324
- }
325
- var status : TerminationStatus ? = nil
326
- switch siginfo. si_code {
327
- case . init( CLD_EXITED) :
328
- status = . exited( siginfo. si_status)
329
- case . init( CLD_KILLED) , . init( CLD_DUMPED) :
330
- status = . unhandledException( siginfo. si_status)
331
- case . init( CLD_TRAPPED) , . init( CLD_STOPPED) , . init( CLD_CONTINUED) :
332
- // Ignore these signals because they are not related to
333
- // process exiting
334
- break
335
- default :
336
- fatalError ( " Unexpected exit status: \( siginfo. si_code) " )
337
- }
338
- if let status = status {
339
- _childProcessContinuations. withLock { continuations in
343
+ // Create the thread. It will run immediately; because it runs in an infinite
344
+ // loop, we aren't worried about detaching or joining it.
345
+ var thread = pthread_t ( )
346
+ _ = pthread_create (
347
+ & thread,
348
+ nil ,
349
+ { _ -> UnsafeMutableRawPointer ? in
350
+ // Run an infinite loop that waits for child processes to terminate and
351
+ // captures their exit statuses.
352
+ while true {
353
+ // Listen for child process exit events. WNOWAIT means we don't perturb the
354
+ // state of a terminated (zombie) child process, allowing us to fetch the
355
+ // continuation (if available) before reaping.
356
+ var siginfo = siginfo_t ( )
357
+ errno = 0
358
+ if waitid ( P_ALL, id_t ( 0 ) , & siginfo, WEXITED | WNOWAIT) == 0 {
340
359
let pid = siginfo. si_pid
341
- if let existing = continuations. removeValue ( forKey: pid) ,
342
- case . continuation( let c) = existing
343
- {
344
- c. resume ( returning: status)
345
- } else {
346
- // We don't have continuation yet, just state status
347
- continuations [ pid] = . status( status)
360
+
361
+ // If we had a continuation for this PID, allow the process to be reaped
362
+ // and pass the resulting exit condition back to the calling task. If
363
+ // there is no continuation, then either it hasn't been stored yet or
364
+ // this child process is not tracked by the waiter thread.
365
+ guard pid != 0 , let c = _childProcessContinuations. withLock ( { $0. removeValue ( forKey: pid) } ) else {
366
+ continue
367
+ }
368
+
369
+ c. resume ( with: Result {
370
+ // Here waitid should not block because `pid` has already terminated at this point.
371
+ while true {
372
+ var siginfo = siginfo_t ( )
373
+ errno = 0
374
+ if waitid ( P_PID, numericCast ( pid) , & siginfo, WEXITED) == 0 {
375
+ var status : TerminationStatus ? = nil
376
+ switch siginfo. si_code {
377
+ case . init( CLD_EXITED) :
378
+ return . exited( siginfo. si_status)
379
+ case . init( CLD_KILLED) , . init( CLD_DUMPED) :
380
+ return . unhandledException( siginfo. si_status)
381
+ default :
382
+ fatalError ( " Unexpected exit status: \( siginfo. si_code) " )
383
+ }
384
+ } else if errno != EINTR {
385
+ throw SubprocessError . UnderlyingError ( rawValue: errno)
386
+ }
387
+ }
388
+ } )
389
+ } else if errno == ECHILD {
390
+ // We got ECHILD. If there are no continuations added right now, we should
391
+ // suspend this thread on the no-children condition until it's awoken by a
392
+ // newly-scheduled waiter process. (If this condition is spuriously
393
+ // woken, we'll just loop again, which is fine.) Note that we read errno
394
+ // outside the lock in case acquiring the lock perturbs it.
395
+ _childProcessContinuations. withUnsafeUnderlyingLock { lock, childProcessContinuations in
396
+ if childProcessContinuations. isEmpty {
397
+ _ = pthread_cond_wait ( _waitThreadNoChildrenCondition, lock)
398
+ }
348
399
}
349
400
}
350
401
}
351
- }
352
- }
353
- signalSource . resume ( )
402
+ } ,
403
+ nil
404
+ )
354
405
} ( )
355
406
356
- /// Unchecked Sendable here since this class is only explicitly
357
- /// initialized once during the lifetime of the process
358
- final class SendableSourceSignal : @unchecked Sendable {
359
- private let signalSource : DispatchSourceSignal
360
-
361
- func setEventHandler( handler: @escaping DispatchSourceHandler ) {
362
- self . signalSource. setEventHandler ( handler: handler)
363
- }
364
-
365
- func resume( ) {
366
- self . signalSource. resume ( )
367
- }
368
-
369
- init ( ) {
370
- self . signalSource = DispatchSource . makeSignalSource (
371
- signal: SIGCHLD,
372
- queue: . global( )
373
- )
374
- }
375
- }
376
-
377
407
private func _setupMonitorSignalHandler( ) {
378
408
// Only executed once
379
409
setup
0 commit comments