Skip to content

Commit 398eb86

Browse files
authored
ProcessExecutor: add teardown sequences (#100)
This allows to write ``` let p = ProcessExecutor( executable: "/usr/bin/swift", [ "build", "-c", "release" ], teardownSequence: [ .sendSignal(SIGINT, allowedTimeToExitNS: 100_000_000), // SIGINT, then 100s .sendSignal(SIGTERM, allowedTimeToExitNS: 1_000_000_000), // SIGTERM, then 1s // and always SIGKILL if nothing worked ] ) ``` which allows the process (`swift build` in this case) to clean up its child processes.
1 parent f1d464d commit 398eb86

File tree

2 files changed

+322
-39
lines changed

2 files changed

+322
-39
lines changed

Sources/AsyncProcess/ProcessExecutor.swift

Lines changed: 178 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
//===----------------------------------------------------------------------===//
1212

1313
import Atomics
14-
import Foundation
1514
import Logging
1615
import NIO
1716

@@ -162,8 +161,47 @@ public final actor ProcessExecutor {
162161
private let _standardError: ChunkSequence
163162
private let processIsRunningApproximation = ManagedAtomic(RunningStateApproximation.neverStarted.rawValue)
164163
private let processOutputConsumptionApproximation = ManagedAtomic(UInt8(0))
164+
private let processPid = ManagedAtomic(pid_t(0))
165165
private let ownsStandardOutputWriteHandle: Bool
166166
private let ownsStandardErrorWriteHandle: Bool
167+
private let teardownSequence: TeardownSequence
168+
169+
public struct OSError: Error & Sendable & Hashable {
170+
public var errnoNumber: CInt
171+
public var function: String
172+
}
173+
174+
/// An ordered list of steps in order to tear down a process.
175+
///
176+
/// Always ends in sending a `SIGKILL` whether that's specified or not.
177+
public struct TeardownSequence: Sendable, ExpressibleByArrayLiteral, CustomStringConvertible {
178+
public typealias ArrayLiteralElement = TeardownStep
179+
180+
public init(arrayLiteral elements: TeardownStep...) {
181+
self.steps = (elements.map(\.backing)) + [.kill]
182+
}
183+
184+
public struct TeardownStep: Sendable {
185+
var backing: Backing
186+
187+
enum Backing {
188+
case sendSignal(CInt, allowedTimeNS: UInt64)
189+
case kill
190+
}
191+
192+
/// Send `signal` to process and give it `allowedTimeToExitNS` nanoseconds to exit before progressing
193+
/// to the next teardown step. The final teardown step is always sending a `SIGKILL`.
194+
public static func sendSignal(_ signal: CInt, allowedTimeToExitNS: UInt64) -> Self {
195+
Self(backing: .sendSignal(signal, allowedTimeNS: allowedTimeToExitNS))
196+
}
197+
}
198+
199+
var steps: [TeardownStep.Backing] = [.kill]
200+
201+
public var description: String {
202+
self.steps.map { "\($0)" }.joined(separator: ", ")
203+
}
204+
}
167205

168206
public var standardOutput: ChunkSequence {
169207
let afterValue = self.processOutputConsumptionApproximation.bitwiseXorThenLoad(
@@ -215,6 +253,8 @@ public final actor ProcessExecutor {
215253
/// - standardError: A description of what to do with the standard output of the child process (defaults to
216254
/// ``ProcessOutput/stream``
217255
/// which requires to consume it via ``ProcessExecutor/standardError``.
256+
/// - teardownSequence: What to do if ``ProcessExecutor`` needs to tear down the process abruptly
257+
/// (usually because of Swift Concurrency cancellation)
218258
/// - logger: Where to log diagnostic messages to (default to no where)
219259
public init<StandardInput: AsyncSequence & Sendable>(
220260
group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup,
@@ -224,6 +264,7 @@ public final actor ProcessExecutor {
224264
standardInput: StandardInput,
225265
standardOutput: ProcessOutput = .stream,
226266
standardError: ProcessOutput = .stream,
267+
teardownSequence: TeardownSequence = TeardownSequence(),
227268
logger: Logger = ProcessExecutor.disableLogging
228269
) where StandardInput.Element == ByteBuffer {
229270
self.group = group
@@ -232,6 +273,7 @@ public final actor ProcessExecutor {
232273
self.arguments = arguments
233274
self.standardInput = AnyAsyncSequence(standardInput)
234275
self.logger = logger
276+
self.teardownSequence = teardownSequence
235277

236278
self.standardInputPipe = StandardInput.self == EOFSequence<ByteBuffer>.self ? nil : Pipe()
237279

@@ -327,12 +369,14 @@ public final actor ProcessExecutor {
327369
}
328370

329371
deinit {
372+
let storedPid = self.processPid.load(ordering: .relaxed)
373+
assert(storedPid == 0 || storedPid == -1)
330374
let runningState = self.processIsRunningApproximation.load(ordering: .relaxed)
331375
assert(
332376
runningState == RunningStateApproximation.finishedExecuting.rawValue,
333377
"""
334378
Did you create a ProcessExecutor without run()ning it? \
335-
That's currently illegal:
379+
That's currently illegal: \
336380
illegal running state \(runningState) in deinit
337381
"""
338382
)
@@ -366,6 +410,64 @@ public final actor ProcessExecutor {
366410
)
367411
}
368412

413+
private func teardown(process: Process) async {
414+
let childPid = self.processPid.load(ordering: .sequentiallyConsistent)
415+
guard childPid != 0 else {
416+
self.logger.warning(
417+
"leaking Process because it hasn't got a process identifier (likely a Foundation.Process bug)",
418+
metadata: ["process": "\(process)"]
419+
)
420+
return
421+
}
422+
423+
var logger = self.logger
424+
logger[metadataKey: "pid"] = "\(childPid)"
425+
426+
loop: for step in self.teardownSequence.steps {
427+
if process.isRunning {
428+
logger.trace("running teardown sequence", metadata: ["step": "\(step)"])
429+
enum TeardownStepCompletion {
430+
case processHasExited
431+
case processStillAlive
432+
case killedTheProcess
433+
}
434+
let stepCompletion: TeardownStepCompletion
435+
switch step {
436+
case let .sendSignal(signal, allowedTimeNS):
437+
stepCompletion = await withTaskGroup(of: TeardownStepCompletion.self) { group in
438+
group.addTask {
439+
do {
440+
try await Task.sleep(nanoseconds: allowedTimeNS)
441+
return .processStillAlive
442+
} catch {
443+
return .processHasExited
444+
}
445+
}
446+
try? await self.sendSignal(signal)
447+
return await group.next()!
448+
}
449+
case .kill:
450+
logger.info("sending SIGKILL to process")
451+
kill(childPid, SIGKILL)
452+
stepCompletion = .killedTheProcess
453+
}
454+
logger.debug(
455+
"teardown sequence step complete",
456+
metadata: ["step": "\(step)", "outcome": "\(stepCompletion)"]
457+
)
458+
switch stepCompletion {
459+
case .processHasExited, .killedTheProcess:
460+
break loop
461+
case .processStillAlive:
462+
() // gotta continue
463+
}
464+
} else {
465+
logger.debug("child process already dead")
466+
break
467+
}
468+
}
469+
}
470+
369471
/// Run the process.
370472
///
371473
/// Calling `run()` will run the (sub-)process and return its ``ProcessExitReason`` when the execution completes.
@@ -407,6 +509,12 @@ public final actor ProcessExecutor {
407509
)
408510

409511
p.terminationHandler = { p in
512+
let pidExchangeWorked = self.processPid.compareExchange(
513+
expected: p.processIdentifier,
514+
desired: -1,
515+
ordering: .sequentiallyConsistent
516+
).exchanged
517+
assert(pidExchangeWorked)
410518
self.logger.debug(
411519
"finished running command",
412520
metadata: [
@@ -424,10 +532,12 @@ public final actor ProcessExecutor {
424532
)
425533
precondition(worked, "illegal running state \(original)")
426534

427-
if p.terminationReason == .uncaughtSignal {
428-
terminationStreamProducer.yield(.signal(p.terminationStatus))
429-
} else {
430-
terminationStreamProducer.yield(.exit(p.terminationStatus))
535+
for _ in 0..<2 {
536+
if p.terminationReason == .uncaughtSignal {
537+
terminationStreamProducer.yield(.signal(p.terminationStatus))
538+
} else {
539+
terminationStreamProducer.yield(.exit(p.terminationStatus))
540+
}
431541
}
432542
terminationStreamProducer.finish()
433543
}
@@ -455,8 +565,10 @@ public final actor ProcessExecutor {
455565
throw error
456566
}
457567

458-
// At this point, the process is running, we should therefore have a process ID.
459-
assert(p.processIdentifier != 0)
568+
// At this point, the process is running, we should therefore have a process ID (unless we're already dead).
569+
let childPid = p.processIdentifier
570+
_ = self.processPid.compareExchange(expected: 0, desired: childPid, ordering: .sequentiallyConsistent)
571+
assert(childPid != 0 || !p.isRunning)
460572
self.logger.debug(
461573
"running command",
462574
metadata: [
@@ -474,48 +586,64 @@ public final actor ProcessExecutor {
474586
try! self.standardErrorWriteHandle?.close() // Must work.
475587
}
476588

477-
@Sendable
478-
func cancel() {
479-
let childPid = p.processIdentifier
480-
guard childPid != 0 else {
481-
self.logger.warning(
482-
"leaking Process because it hasn't got a process identifier (likely a Foundation.Process bug)",
483-
metadata: ["process": "\(p)"]
484-
)
485-
return
486-
}
487-
if p.isRunning {
488-
self.logger.info("terminating process", metadata: ["pid": "\(childPid)"])
489-
kill(childPid, SIGKILL)
490-
} else {
491-
self.logger.debug("child process already dead", metadata: ["pid-if-available": "\(childPid)"])
492-
}
493-
}
494-
495589
@Sendable
496590
func waitForChildToExit() async -> ProcessExitReason {
497-
// We do need for the child to exit (and it will, we SIGKILL'd it)
591+
// Please note, we're invoking this function multiple times concurrently, so we're relying on AsyncStream
592+
// supporting this.
593+
594+
// We do need for the child to exit (and it will, we'll eventually SIGKILL it)
498595
await withUncancelledTask(returning: ProcessExitReason.self) {
499596
var iterator = terminationStreamConsumer.makeAsyncIterator()
500597

501598
// Let's wait for the process to finish (it will)
502599
guard let terminationStatus = await iterator.next() else {
503600
fatalError("terminationStream finished without giving us a result")
504601
}
505-
506-
// Just double check that `finish()` has immediately been called too.
507-
let thisMustBeNil = await iterator.next()
508-
precondition(thisMustBeNil == nil)
509602
return terminationStatus
510603
}
511604
}
512605

513-
return try await withThrowingTaskGroup(of: ProcessExitReason?.self, returning: ProcessExitReason.self) {
514-
group in
515-
group.addTask {
516-
await withTaskCancellationHandler(operation: waitForChildToExit, onCancel: cancel)
606+
return try await withThrowingTaskGroup(
607+
of: ProcessExitReason?.self,
608+
returning: ProcessExitReason.self
609+
) { runProcessGroup async throws -> ProcessExitReason in
610+
runProcessGroup.addTask {
611+
await withTaskGroup(of: Void.self) { triggerTeardownGroup in
612+
triggerTeardownGroup.addTask {
613+
// wait until cancelled
614+
do { while true {
615+
try await Task.sleep(nanoseconds: 1_000_000_000)
616+
} } catch {}
617+
618+
let isRunning = self.processIsRunningApproximation.load(ordering: .relaxed)
619+
guard isRunning != RunningStateApproximation.finishedExecuting.rawValue else {
620+
self.logger.trace("skipping teardown, already finished executing")
621+
return
622+
}
623+
let pid = self.processPid.load(ordering: .relaxed)
624+
var logger = self.logger
625+
logger[metadataKey: "pid"] = "\(pid)"
626+
logger.debug("we got cancelled")
627+
await withUncancelledTask {
628+
await withTaskGroup(of: Void.self) { runTeardownStepsGroup in
629+
runTeardownStepsGroup.addTask {
630+
await self.teardown(process: p)
631+
}
632+
runTeardownStepsGroup.addTask {
633+
_ = await waitForChildToExit()
634+
}
635+
await runTeardownStepsGroup.next()!
636+
runTeardownStepsGroup.cancelAll()
637+
}
638+
}
639+
}
640+
641+
let result = await waitForChildToExit()
642+
triggerTeardownGroup.cancelAll() // This triggers the teardown
643+
return result
644+
}
517645
}
518-
group.addTask {
646+
runProcessGroup.addTask {
519647
if let stdinPipe = self.standardInputPipe {
520648
let fdForNIO = dup(stdinPipe.fileHandleForWriting.fileDescriptor)
521649
try! stdinPipe.fileHandleForWriting.close()
@@ -530,15 +658,25 @@ public final actor ProcessExecutor {
530658
}
531659

532660
var exitReason: ProcessExitReason? = nil
533-
// cannot fix this warning yet (rdar://113844171)
534-
while let result = try await group.next() {
661+
while let result = try await runProcessGroup.next() {
535662
if let result {
536663
exitReason = result
537664
}
538665
}
539666
return exitReason! // must work because the real task will return a reason (or throw)
540667
}
541668
}
669+
670+
public func sendSignal(_ signal: CInt) async throws {
671+
let pid = self.processPid.load(ordering: .sequentiallyConsistent)
672+
if pid == 0 || pid == -1 {
673+
throw OSError(errnoNumber: ESRCH, function: "sendSignal")
674+
}
675+
let ret = kill(pid, signal)
676+
if ret == -1 {
677+
throw OSError(errnoNumber: errno, function: "kill")
678+
}
679+
}
542680
}
543681

544682
public extension ProcessExecutor {
@@ -581,6 +719,7 @@ public extension ProcessExecutor {
581719
environment: [String: String] = [:],
582720
standardOutput: ProcessOutput = .stream,
583721
standardError: ProcessOutput = .stream,
722+
teardownSequence: TeardownSequence = TeardownSequence(),
584723
logger: Logger = ProcessExecutor.disableLogging
585724
) {
586725
self.init(
@@ -591,6 +730,7 @@ public extension ProcessExecutor {
591730
standardInput: EOFSequence(),
592731
standardOutput: standardOutput,
593732
standardError: standardError,
733+
teardownSequence: teardownSequence,
594734
logger: logger
595735
)
596736
}

0 commit comments

Comments
 (0)