diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index d67eede..0f4d533 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -74,23 +74,17 @@ public struct Configuration: Sendable { let pid = spawnResults.execution.processIdentifier var spawnResultBox: SpawnResult?? = consume spawnResults + var _spawnResult = spawnResultBox!.take()! - return try await withAsyncTaskCleanupHandler { - var _spawnResult = spawnResultBox!.take()! + let processIdentifier = _spawnResult.execution.processIdentifier + + let result = try await withAsyncTaskCleanupHandler { let inputIO = _spawnResult.inputWriteEnd() let outputIO = _spawnResult.outputReadEnd() let errorIO = _spawnResult.errorReadEnd() - let processIdentifier = _spawnResult.execution.processIdentifier - async let terminationStatus = try monitorProcessTermination( - forProcessWithIdentifier: processIdentifier - ) // Body runs in the same isolation - let result = try await body(_spawnResult.execution, inputIO, outputIO, errorIO) - return ExecutionResult( - terminationStatus: try await terminationStatus, - value: result - ) + return try await body(_spawnResult.execution, inputIO, outputIO, errorIO) } onCleanup: { // Attempt to terminate the child process await Execution.runTeardownSequence( @@ -98,6 +92,11 @@ public struct Configuration: Sendable { on: pid ) } + + return ExecutionResult( + terminationStatus: try await monitorProcessTermination(forProcessWithIdentifier: processIdentifier), + value: result + ) } } @@ -752,11 +751,13 @@ extension Optional where Wrapped == String { } } +/// Runs `body`, and then runs `onCleanup` if body throws an error, or if the parent task is cancelled. In the latter case, `onCleanup` may be run concurrently with `body`. `body` is guaranteed to run exactly once. `onCleanup` is guaranteed to run only once, or not at all. internal func withAsyncTaskCleanupHandler( _ body: () async throws -> Result, onCleanup handler: @Sendable @escaping () async -> Void, isolation: isolated (any Actor)? = #isolation ) async rethrows -> Result { + let (runCancellationHandlerStream, runCancellationHandlerContinuation) = AsyncThrowingStream.makeStream(of: Void.self) return try await withThrowingTaskGroup( of: Void.self, returning: Result.self @@ -767,15 +768,38 @@ internal func withAsyncTaskCleanupHandler( // before the time ends. We then run the cancel handler. do { while true { try await Task.sleep(nanoseconds: 1_000_000_000) } } catch {} // Run task cancel handler - await handler() + runCancellationHandlerContinuation.finish(throwing: CancellationError()) + } + + group.addTask { + // Enumerate the async stream until it completes or throws an error. + // Since we signal completion of the stream from cancellation or the + // parent task or the body throwing, this ensures that we run the + // cleanup handler exactly once in any failure scenario, and also do + // so _immediately_ if the failure scenario is due to parent task + // cancellation. We do so in a detached Task to prevent cancellation + // of the parent task from interrupting enumeration of the stream itself. + await Task.detached { + do { + var iterator = runCancellationHandlerStream.makeAsyncIterator() + while let _ = try await iterator.next() { + } + } catch { + await handler() + } + }.value + } + + defer { + group.cancelAll() } do { let result = try await body() - group.cancelAll() + runCancellationHandlerContinuation.finish() return result } catch { - await handler() + runCancellationHandlerContinuation.finish(throwing: error) throw error } }