diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index ee7ac0f5..516ef3b9 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -557,6 +557,89 @@ public func run< ) } +public enum PipelineError: Error { + case runErrors([Swift.Error]) +} + +/// Run a series of `Configuration` asynchronously and returns +/// a `CollectedResult` collecting the result of the final process at the +/// end of a pipeline of all processes joined together with pipes from standard input +/// to standard output. +/// +/// - Parameters: +/// - configurations: The `Subprocess` configurations for the start of the pipeline +/// - last: The final configuration that produces the output +/// - input: The input to send to the first executable. +/// - output: The method to use for redirecting the standard output of the last executable. +/// - error: The method to use for redirecting the standard error of all executables. +/// - Returns a CollectedResult containing the result of the pipeline and the termination status of the final executable. +func run(_ first: Configuration, _ pipeline: Configuration..., to last: Configuration, input: Input = .none, output: Output, error: Error = .discarded) async throws -> CollectedResult { + var pipes = [try FileDescriptor.pipe()] + for _ in pipeline { + pipes.append(try FileDescriptor.pipe()) + } + + let finalPipes = pipes + + let result = await withTaskGroup { taskGroup -> Result,PipelineError> in + taskGroup.addTask { () -> Result?,Swift.Error> in + do { + _ = try await run(first, input: input, output: .fileDescriptor(finalPipes.first!.writeEnd, closeAfterSpawningProcess: true), error: error) + return Result.success(nil) + } catch { + return Result.failure(error) + } + } + + for (idx, p) in pipeline.enumerated() { + taskGroup.addTask { () -> Result?,Swift.Error> in + do { + _ = try await run(p, input: .fileDescriptor(finalPipes[idx+1].readEnd, closeAfterSpawningProcess: true), output: .fileDescriptor(finalPipes[idx].writeEnd, closeAfterSpawningProcess: true), error: error) + return Result.success(nil) + } catch { + return Result.failure(error) + } + } + } + + taskGroup.addTask { () -> Result?,Swift.Error> in + do { + let collectedResult = try await run(last, input: .fileDescriptor(finalPipes.last!.readEnd, closeAfterSpawningProcess: true), output: output, error: error) + return Result.success(collectedResult) + } catch { + return Result.failure(error) + } + } + + var errors: [Swift.Error] = [] + var collection: CollectedResult? + + while let taskVal = await taskGroup.next() { + switch taskVal { + case .failure(let error): + errors.append(error) + case .success(.some(let c)): + collection = c + case .success(.none): + continue + } + } + + if errors.isEmpty { + return Result.failure(PipelineError.runErrors(errors)) + } + + return Result.success(collection!) + } + + switch result { + case .success(let collection): + return collection + case .failure(let pipelineError): + throw pipelineError + } +} + /// Run an executable with given parameters specified by a `Configuration` /// - Parameters: /// - configuration: The `Subprocess` configuration to run.