diff --git a/PIPE_CONFIGURATION_USAGE.md b/PIPE_CONFIGURATION_USAGE.md new file mode 100644 index 0000000..83b1eb1 --- /dev/null +++ b/PIPE_CONFIGURATION_USAGE.md @@ -0,0 +1,496 @@ +# PipeConfiguration with Stage Arrays + +This document demonstrates the usage of `PipeConfiguration` with stage arrays and the visually appealing `|>` operator for final I/O specification. + +## Key Features + +- **Type-safe pipeline construction** with generic parameters +- **Clean stage array API** - I/O configuration specified at the end with `.finally()` or `|>` +- **Shell-like operators** - `|` for intermediate processes, `|>` for final I/O specification +- **Concurrent execution** - automatic process parallelization with `withThrowingTaskGroup` +- **Flexible error redirection** - control how stderr is handled in pipelines + +## API Design Philosophy + +The `PipeConfiguration` API uses a **stage array pattern**: + +1. **pipe() functions return stage arrays** - when you call `pipe()`, it returns `[PipeStage]` +2. **Pipe operators build stage arrays** - intermediate stages build up arrays of `PipeStage` +3. **finally() or |> specify I/O** - only at the end do you specify the real input/output/error types + +This eliminates interim `PipeConfiguration` objects with discarded I/O and makes pipeline construction clean and direct. + +## Basic Usage + +### Single Process +```swift +// Using .finally() method +let config = pipe( + .name("echo"), + arguments: ["Hello World"] +).finally( + output: .string(limit: .max) +) + +// Using |> operator (visually appealing!) +let config = pipe( + .name("echo"), + arguments: ["Hello World"] +) |> .string(limit: .max) + +let result = try await config.run() +print(result.standardOutput) // "Hello World" +``` + +### Pipeline with Stage Arrays + +**✅ Using .finally() method:** +```swift +let pipeline = (pipe( + .name("echo"), + arguments: ["apple\nbanana\ncherry"] +) | .name("sort") // ✅ Builds stage array + | .name("head") // ✅ Continues building array + | ( // ✅ Adds configured stage + .name("wc"), + arguments: ["-l"] + )).finally( + output: .string(limit: .max), // ✅ Only here we specify real I/O + error: .discarded +) +``` + +**✅ Using |> operator (clean and visually appealing!):** +```swift +let pipeline = pipe( + .name("echo"), + arguments: ["apple\nbanana\ncherry"] +) | .name("sort") // ✅ Builds stage array + | .name("head") // ✅ Continues building array + | ( // ✅ Adds configured stage + .name("wc"), + arguments: ["-l"] + ) |> ( // ✅ Visually appealing final I/O! + output: .string(limit: .max), + error: .discarded + ) + +let result = try await pipeline.run() +print(result.standardOutput) // "3" +``` + +## Error Redirection + +PipeConfiguration now supports three modes for handling standard error: + +### `.separate` (Default) +```swift +let config = pipe( + .name("sh"), + arguments: ["-c", "echo 'stdout'; echo 'stderr' >&2"], + options: .default // or ProcessStageOptions(errorRedirection: .separate) +) |> ( + output: .string(limit: .max), + error: .string(limit: .max) +) + +let result = try await config.run() +// result.standardOutput contains "stdout" +// result.standardError contains "stderr" +``` + +### `.replaceStdout` - Redirect stderr to stdout, discard original stdout +```swift +let config = pipe( + .name("sh"), + arguments: ["-c", "echo 'stdout'; echo 'stderr' >&2"], + options: .stderrToStdout // Convenience for .replaceStdout +) |> ( + output: .string(limit: .max), + error: .string(limit: .max) +) + +let result = try await config.run() +// result.standardOutput contains "stderr" (stdout was discarded) +// result.standardError contains "stderr" +``` + +### `.mergeWithStdout` - Both stdout and stderr go to the same destination +```swift +let config = pipe( + .name("sh"), + arguments: ["-c", "echo 'stdout'; echo 'stderr' >&2"], + options: .mergeErrors // Convenience for .mergeWithStdout +) |> ( + output: .string(limit: .max), + error: .string(limit: .max) +) + +let result = try await config.run() +// Both result.standardOutput and result.standardError contain both "stdout" and "stderr" +``` + +## Error Redirection in Pipelines + +### Using `withOptions()` helper +```swift +let pipeline = finally( + stages: pipe( + .name("sh"), + arguments: ["-c", "echo 'data'; echo 'warning' >&2"], + options: .mergeErrors // Merge stderr into stdout + ) | withOptions( + configuration: Configuration(executable: .name("grep"), arguments: ["warning"]), + options: .default + ) | ( + .name("wc"), + arguments: ["-l"] + ), + output: .string(limit: .max), + error: .discarded +) + +let result = try await pipeline.run() +// Should find the warning that was merged into stdout +``` + +### Using stage options +```swift +let pipeline = finally( + stages: pipe( + .name("find"), + arguments: ["/some/path"] + ) | ( + .name("grep"), + arguments: ["-v", "Permission denied"], + options: .stderrToStdout // Convert any stderr to stdout + ) | ( + .name("wc"), + arguments: ["-l"] + ), + output: .string(limit: .max), + error: .discarded +) +``` + +## Operator Variants + +### Stage Array Operators (`|`) +```swift +stages | (.name("grep")) // Add simple process stage +stages | Configuration(executable: ...) // Add configuration stage +stages | ( // Add with arguments and options + .name("sort"), + arguments: ["-r"], + options: .mergeErrors +) +stages | ( // Configuration with options + configuration: myConfig, + options: .stderrToStdout +) +stages | { input, output, error in // Add Swift function stage + // Swift function implementation + return 0 +} +``` + +### Final Operators (`|>`) +```swift +stages |> (output: .string(limit: .max), error: .discarded) // Simple final output +stages |> .string(limit: .max) // Output only (discarded error) +``` + +## Helper Functions + +### `finally()` - For creating PipeConfiguration from stage arrays +```swift +finally(stages: myStages, output: .string(limit: .max), error: .discarded) +finally(stages: myStages, output: .string(limit: .max)) // Auto-discard error +finally(stages: myStages, input: .string("data"), output: .string(limit: .max), error: .discarded) +``` + +## Real-World Examples + +### Log Processing with Error Handling +```swift +let logProcessor = pipe( + .name("tail"), + arguments: ["-f", "/var/log/app.log"], + options: .mergeErrors // Capture any tail errors as data +) | ( + .name("grep"), + arguments: ["-E", "(ERROR|WARN)"], + options: .stderrToStdout // Convert grep errors to output +) |> finally( + .name("head"), + arguments: ["-20"], + output: .string(limit: .max), + error: .string(limit: .max) // Capture final errors separately +) +``` + +### File Processing with Error Recovery +```swift +let fileProcessor = pipe( + .name("find"), + arguments: ["/data", "-name", "*.log", "-type", "f"], + options: .replaceStdout // Convert permission errors to "output" +) | ( + .name("head"), + arguments: ["-100"], // Process first 100 files/errors + options: .mergeErrors +) |> finally( + .name("wc"), + arguments: ["-l"], + output: .string(limit: .max), + error: .discarded +) +``` + +## Swift Functions with JSON Processing + +PipeConfiguration supports embedding Swift functions directly in pipelines, which is particularly powerful for JSON processing tasks where you need Swift's type safety and `Codable` support. + +### JSON Transformation Pipeline +```swift +struct InputData: Codable { + let items: [String] + let metadata: [String: String] +} + +struct OutputData: Codable { + let processedItems: [String] + let itemCount: Int + let processingDate: String +} + +let pipeline = pipe( + .name("echo"), + arguments: [#"{"items": ["apple", "banana", "cherry"], "metadata": {"source": "test"}}"#] +).pipe( + { input, output, err in + // Transform JSON structure with type safety + var jsonData = Data() + + for try await chunk in input.lines() { + jsonData.append(contentsOf: chunk.utf8) + } + + do { + let decoder = JSONDecoder() + let inputData = try decoder.decode(InputData.self, from: jsonData) + + let outputData = OutputData( + processedItems: inputData.items.map { $0.uppercased() }, + itemCount: inputData.items.count, + processingDate: ISO8601DateFormatter().string(from: Date()) + ) + + let encoder = JSONEncoder() + encoder.outputFormatting = .prettyPrinted + let outputJson = try encoder.encode(outputData) + let jsonString = String(data: outputJson, encoding: .utf8) ?? "" + + let written = try await output.write(jsonString) + return written > 0 ? 0 : 1 + } catch { + try await err.write("JSON transformation failed: \(error)") + return 1 + } + } +).finally( + output: .string(limit: .max), + error: .string(limit: .max) +) +``` + +### JSON Stream Processing +```swift +struct LogEntry: Codable { + let timestamp: String + let level: String + let message: String +} + +let logProcessor = pipe( + .name("tail"), + arguments: ["-f", "/var/log/app.log"] +).pipe( + { input, output, err in + // Process JSON log entries line by line + for try await line in input.lines() { + guard !line.isEmpty else { continue } + + do { + let decoder = JSONDecoder() + let logEntry = try decoder.decode(LogEntry.self, from: line.data(using: .utf8) ?? Data()) + + // Filter for error/warning logs and format output + if ["ERROR", "WARN"].contains(logEntry.level) { + let formatted = "[\(logEntry.timestamp)] \(logEntry.level): \(logEntry.message)" + _ = try await output.write(formatted + "\n") + } + } catch { + // Skip malformed JSON lines + continue + } + } + return 0 + } +).pipe( + .name("head"), + arguments: ["-20"] // Limit to first 20 error/warning entries +).finally( + output: .string(limit: .max), + error: .string(limit: .max) +) +``` + +### JSON Aggregation Pipeline +```swift +struct SalesRecord: Codable { + let product: String + let amount: Double + let date: String +} + +struct SalesSummary: Codable { + let totalSales: Double + let productCounts: [String: Int] + let averageSale: Double +} + +let salesAnalyzer = pipe( + .name("cat"), + arguments: ["sales_data.jsonl"] // JSON Lines format +).pipe( + { input, output, err in + // Aggregate JSON sales data with Swift collections + var totalSales: Double = 0 + var productCounts: [String: Int] = [:] + var recordCount = 0 + + for try await line in input.lines() { + guard !line.isEmpty else { continue } + + do { + let decoder = JSONDecoder() + let record = try decoder.decode(SalesRecord.self, from: line.data(using: .utf8) ?? Data()) + + totalSales += record.amount + productCounts[record.product, default: 0] += 1 + recordCount += 1 + } catch { + // Log parsing errors but continue processing + try await err.write("Failed to parse line: \(line)\n") + } + } + + let summary = SalesSummary( + totalSales: totalSales, + productCounts: productCounts, + averageSale: recordCount > 0 ? totalSales / Double(recordCount) : 0 + ) + + do { + let encoder = JSONEncoder() + encoder.outputFormatting = .prettyPrinted + let summaryJson = try encoder.encode(summary) + let jsonString = String(data: summaryJson, encoding: .utf8) ?? "" + + let written = try await output.write(jsonString) + return written > 0 ? 0 : 1 + } catch { + try await err.write("Failed to encode summary: \(error)") + return 1 + } + } +).finally( + output: .string(limit: .max), + error: .string(limit: .max) +) +``` + +### Combining Swift Functions with External Tools +```swift +struct User: Codable { + let id: Int + let username: String + let email: String +} + +let usersJson = #"[{"id": 1, "username": "alice", "email": "alice@example.com"}, {"id": 2, "username": "bob", "email": "bob@example.com"}, {"id": 3, "username": "charlie", "email": "charlie@example.com"}, {"id": 6, "username": "dave", "email": "dave@example.com"}]"# + +let userProcessor = pipe( + .name("echo"), + arguments: [usersJson] +).pipe( + { input, output, err in + // Decode JSON and filter with Swift + var jsonData = Data() + + for try await chunk in input.lines() { + jsonData.append(contentsOf: chunk.utf8) + } + + do { + let decoder = JSONDecoder() + let users = try decoder.decode([User].self, from: jsonData) + + // Filter and transform users with Swift + let filteredUsers = users.filter { $0.id <= 5 } + let usernames = filteredUsers.map { $0.username }.joined(separator: "\n") + + let written = try await output.write(usernames) + return written > 0 ? 0 : 1 + } catch { + try await err.write("JSON decoding failed: \(error)") + return 1 + } + } +).pipe( + .name("sort") // Use external tool for sorting +).finally( + output: .string(limit: .max), + error: .string(limit: .max) +) +``` + +### Benefits of Swift Functions in Pipelines + +1. **Type Safety**: Use Swift's `Codable` for guaranteed JSON parsing +2. **Error Handling**: Robust error handling with Swift's `do-catch` +3. **Performance**: In-memory processing without external tool overhead +4. **Integration**: Seamless mixing with traditional Unix tools +5. **Maintainability**: Readable, testable Swift code within pipelines + +## ProcessStageOptions Reference + +```swift +// Predefined options +ProcessStageOptions.default // .separate - keep stdout/stderr separate +ProcessStageOptions.stderrToStdout // .replaceStdout - stderr becomes stdout +ProcessStageOptions.mergeErrors // .mergeWithStdout - both to same destination + +// Custom options +ProcessStageOptions(errorRedirection: .separate) +ProcessStageOptions(errorRedirection: .replaceStdout) +ProcessStageOptions(errorRedirection: .mergeWithStdout) +``` + +## Type Safety + +The generic parameters ensure compile-time safety: + +```swift +// Input type from first process +PipeConfiguration, DiscardedOutput> + +// Intermediate processes can have different error handling +// Final process can change output/error types +pipeline |> finally( + .name("wc"), + output: .string(limit: .max), // New output type + error: .fileDescriptor(errorFile) // New error type +) // Result: PipeConfiguration, FileDescriptorOutput> +``` diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index f1e7317..4f309d8 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -42,11 +42,13 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { private let diskIO: DiskIO private let preferredBufferSize: Int + private let isAsyncIO: Bool private var buffer: [Buffer] - internal init(diskIO: DiskIO, preferredBufferSize: Int?) { + internal init(diskIO: DiskIO, preferredBufferSize: Int?, isAsyncIO: Bool) { self.diskIO = diskIO self.buffer = [] + self.isAsyncIO = isAsyncIO self.preferredBufferSize = preferredBufferSize ?? readBufferSize } @@ -60,7 +62,8 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { // Read more data let data = try await AsyncIO.shared.read( from: self.diskIO, - upTo: self.preferredBufferSize + upTo: self.preferredBufferSize, + isAsyncIO: self.isAsyncIO ) guard let data else { // We finished reading. Close the file descriptor now @@ -87,17 +90,20 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { private let diskIO: DiskIO private let preferredBufferSize: Int? + private let isAsyncIO: Bool - internal init(diskIO: DiskIO, preferredBufferSize: Int?) { + internal init(diskIO: DiskIO, preferredBufferSize: Int?, isAsyncIO: Bool = true) { self.diskIO = diskIO self.preferredBufferSize = preferredBufferSize + self.isAsyncIO = isAsyncIO } /// Creates a iterator for this asynchronous sequence. public func makeAsyncIterator() -> Iterator { return Iterator( diskIO: self.diskIO, - preferredBufferSize: self.preferredBufferSize + preferredBufferSize: self.preferredBufferSize, + isAsyncIO: self.isAsyncIO, ) } diff --git a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift index 3bebe12..70e8c9d 100644 --- a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift +++ b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift @@ -31,17 +31,20 @@ final class AsyncIO: Sendable { internal func read( from diskIO: borrowing IOChannel, - upTo maxLength: Int + upTo maxLength: Int, + isAsyncIO: Bool = true, ) async throws -> DispatchData? { return try await self.read( from: diskIO.channel, upTo: maxLength, + isAsyncIO: isAsyncIO, ) } internal func read( from dispatchIO: DispatchIO, - upTo maxLength: Int + upTo maxLength: Int, + isAsyncIO: Bool, ) async throws -> DispatchData? { return try await withCheckedThrowingContinuation { continuation in var buffer: DispatchData = .empty diff --git a/Sources/Subprocess/IO/AsyncIO+Linux.swift b/Sources/Subprocess/IO/AsyncIO+Linux.swift index cd9c63f..0d503ac 100644 --- a/Sources/Subprocess/IO/AsyncIO+Linux.swift +++ b/Sources/Subprocess/IO/AsyncIO+Linux.swift @@ -287,6 +287,16 @@ final class AsyncIO: Sendable { &event ) if rc != 0 { + if errno == EPERM { + // Special Case: + // + // * EPERM can happen when this is a regular file (not pipe, socket, etc.) which is available right away for read/write, + // so we just go ahead and yield for I/O on the file descriptor. There's no need to wait. + // + continuation.yield(true) + return + } + _registration.withLock { storage in _ = storage.removeValue(forKey: fileDescriptor.rawValue) } @@ -318,7 +328,15 @@ final class AsyncIO: Sendable { fileDescriptor.rawValue, nil ) - guard rc == 0 else { + + // Special Cases: + // + // * EPERM is set if the file descriptor is a regular file (not pipe, socket, etc.) and so it was never + // registered with epoll. + // * ENOENT is set if the file descriptor is unknown to epoll, so we an just continue and remove it + // from registration. + // + if rc != 0 && errno != EPERM && errno != ENOENT { throw SubprocessError( code: .init( .asyncIOFailed( @@ -327,6 +345,7 @@ final class AsyncIO: Sendable { underlyingError: .init(rawValue: errno) ) } + _registration.withLock { store in _ = store.removeValue(forKey: fileDescriptor.rawValue) } @@ -348,14 +367,16 @@ extension AsyncIO { func read( from diskIO: borrowing IOChannel, - upTo maxLength: Int + upTo maxLength: Int, + isAsyncIO: Bool = true, ) async throws -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) + return try await self.read(from: diskIO.channel, upTo: maxLength, isAsyncIO: isAsyncIO) } func read( from fileDescriptor: FileDescriptor, - upTo maxLength: Int + upTo maxLength: Int, + isAsyncIO: Bool, ) async throws -> [UInt8]? { guard maxLength > 0 else { return nil diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index ff4c8f3..eceb16a 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -245,14 +245,16 @@ final class AsyncIO: @unchecked Sendable { func read( from diskIO: borrowing IOChannel, - upTo maxLength: Int + upTo maxLength: Int, + isAsyncIO: Bool = true, ) async throws -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) + return try await self.read(from: diskIO.channel, upTo: maxLength, isAsyncIO: isAsyncIO) } func read( from handle: HANDLE, - upTo maxLength: Int + upTo maxLength: Int, + isAsyncIO: Bool, ) async throws -> [UInt8]? { guard maxLength > 0 else { return nil @@ -264,7 +266,70 @@ final class AsyncIO: @unchecked Sendable { var resultBuffer: [UInt8] = Array( repeating: 0, count: bufferLength ) + var readLength: Int = 0 + + // We can't be certain that the HANDLE has overlapping I/O enabled on it, so + // here we fall back to synchronous reads. + guard isAsyncIO else { + while true { + let (succeed, bytesRead) = try resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in + // Get a pointer to the memory at the specified offset + // Windows ReadFile uses DWORD for target count, which means we can only + // read up to DWORD (aka UInt32) max. + let targetCount: DWORD = self.calculateRemainingCount( + totalCount: bufferPointer.count, + readCount: readLength + ) + + var bytesRead = UInt32(0) + let offsetAddress = bufferPointer.baseAddress!.advanced(by: readLength) + // Read directly into the buffer at the offset + return ( + ReadFile( + handle, + offsetAddress, + targetCount, + &bytesRead, + nil + ), bytesRead + ) + } + + guard succeed else { + let error = SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: GetLastError()) + ) + throw error + } + + if bytesRead == 0 { + // We reached EOF. Return whatever's left + guard readLength > 0 else { + return nil + } + resultBuffer.removeLast(resultBuffer.count - readLength) + return resultBuffer + } else { + // Read some data + readLength += Int(truncatingIfNeeded: bytesRead) + if maxLength == .max { + // Grow resultBuffer if needed + guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { + continue + } + resultBuffer.append( + contentsOf: Array(repeating: 0, count: resultBuffer.count) + ) + } else if readLength >= maxLength { + // When we reached maxLength, return! + return resultBuffer + } + } + } + } + var signalStream = self.registerHandle(handle).makeAsyncIterator() while true { diff --git a/Sources/Subprocess/PipeConfiguration.swift b/Sources/Subprocess/PipeConfiguration.swift new file mode 100644 index 0000000..83f01cb --- /dev/null +++ b/Sources/Subprocess/PipeConfiguration.swift @@ -0,0 +1,906 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if canImport(System) +@preconcurrency import System +#else +@preconcurrency import SystemPackage +#endif + +#if canImport(Foundation) +import Foundation +#endif + +#if canImport(Darwin) +import Darwin +#elseif canImport(Android) +import Android +#elseif canImport(Glibc) +import Glibc +#elseif canImport(Musl) +import Musl +#elseif canImport(WinSDK) +@preconcurrency import WinSDK +#endif + +internal import Dispatch + +// MARK: - Custom Operators + +/// Final pipe operator - pipes to the last process in a pipeline and specifies output +infix operator |> : AdditionPrecedence + +// MARK: - Error Redirection Options + +/// Options for redirecting standard error in process pipelines +public enum ErrorRedirection: Sendable { + /// Keep stderr separate (default behavior) + case separate + /// Merge stderr into stdout (both go to the same destination) + case mergeWithStdout +} + +/// Configuration for error redirection in process stages +public struct ProcessStageOptions: Sendable { + /// How to handle standard error redirection + public let errorRedirection: ErrorRedirection + + /// Initialize with error redirection option + public init(errorRedirection: ErrorRedirection = .separate) { + self.errorRedirection = errorRedirection + } + + /// Default options (no redirection) + public static let `default` = ProcessStageOptions() + + /// Merge stderr with stdout + public static let mergeErrors = ProcessStageOptions(errorRedirection: .mergeWithStdout) +} + +// MARK: - PipeStage (Public API) + +/// A single stage in a process pipeline +public struct PipeStage: Sendable { + enum StageType: Sendable { + case process(configuration: Configuration, options: ProcessStageOptions) + case swiftFunction(@Sendable (AsyncBufferSequence, StandardInputWriter, StandardInputWriter) async throws -> UInt32) + } + + let stageType: StageType + + /// Create a PipeStage from a process configuration + public init( + configuration: Configuration, + options: ProcessStageOptions = .default + ) { + self.stageType = .process(configuration: configuration, options: options) + } + + /// Create a PipeStage from executable parameters + public init( + _ executable: Executable, + arguments: Arguments = [], + environment: Environment = .inherit, + workingDirectory: FilePath? = nil, + platformOptions: PlatformOptions = PlatformOptions(), + options: ProcessStageOptions = .default + ) { + let configuration = Configuration( + executable: executable, + arguments: arguments, + environment: environment, + workingDirectory: workingDirectory, + platformOptions: platformOptions + ) + self.stageType = .process(configuration: configuration, options: options) + } + + /// Create a PipeStage from a Swift function + public init( + swiftFunction: @escaping @Sendable (AsyncBufferSequence, StandardInputWriter, StandardInputWriter) async throws -> UInt32 + ) { + self.stageType = .swiftFunction(swiftFunction) + } + + // Convenience accessors + var configuration: Configuration? { + switch stageType { + case .process(let configuration, _): + return configuration + case .swiftFunction: + return nil + } + } + + var options: ProcessStageOptions { + switch stageType { + case .process(_, let options): + return options + case .swiftFunction: + return .default + } + } +} + +/// A struct that encapsulates one or more pipe stages in a pipeline +/// with overall I/O specification for input, output and errors. +/// A pipe stage can be either a process with options for reconfiguring +/// standard output and standard error, or a Swift function that can +/// stream standard input, output, and error with an exit code. +public struct PipeConfiguration< + Input: InputProtocol, + Output: OutputProtocol, + Error: ErrorOutputProtocol +>: Sendable, CustomStringConvertible { + /// Array of process stages in the pipeline + internal var stages: [PipeStage] + + /// Input configuration for the first stage + internal var input: Input + + /// Output configuration for the last stage + internal var output: Output + + /// Error configuration for the last stage + internal var error: Error + + /// Initialize a PipeConfiguration with a base Configuration + /// Internal initializer - users should use convenience initializers + internal init( + configuration: Configuration, + input: Input, + output: Output, + error: Error, + options: ProcessStageOptions = .default + ) { + self.stages = [PipeStage(configuration: configuration, options: options)] + self.input = input + self.output = output + self.error = error + } + + /// Internal initializer for creating from stages and I/O + internal init( + stages: [PipeStage], + input: Input, + output: Output, + error: Error + ) { + self.stages = stages + self.input = input + self.output = output + self.error = error + } + + // MARK: - CustomStringConvertible + + public var description: String { + if stages.count == 1 { + let stage = stages[0] + switch stage.stageType { + case .process(let configuration, _): + return "PipeConfiguration(\(configuration.executable))" + case .swiftFunction: + return "PipeConfiguration(swiftFunction)" + } + } else { + return "Pipeline with \(stages.count) stages" + } + } +} + +/// Helper enum for pipeline task results +internal enum PipelineTaskResult: Sendable { + case success(Int, SendableCollectedResult) + case failure(Int, Swift.Error) +} + +/// Sendable wrapper for CollectedResult +internal struct SendableCollectedResult: @unchecked Sendable { + let processIdentifier: ProcessIdentifier + let terminationStatus: TerminationStatus + let standardOutput: Any + let standardError: Any + + init(_ result: CollectedResult) { + self.processIdentifier = result.processIdentifier + self.terminationStatus = result.terminationStatus + self.standardOutput = result.standardOutput + self.standardError = result.standardError + } +} + +private func currentProcessIdentifier() -> ProcessIdentifier { + #if os(macOS) + return .init(value: ProcessInfo.processInfo.processIdentifier) + #elseif canImport(Glibc) || canImport(Android) || canImport(Musl) + return .init(value: ProcessInfo.processInfo.processIdentifier, processDescriptor: -1) + #elseif os(Windows) + return .init(value: UInt32(ProcessInfo.processInfo.processIdentifier), processDescriptor: INVALID_HANDLE_VALUE, threadHandle: INVALID_HANDLE_VALUE) + #endif +} + +private func createIODescriptor(from fd: FileDescriptor, closeWhenDone: Bool) -> IODescriptor { + #if canImport(WinSDK) + return IODescriptor(HANDLE(bitPattern: _get_osfhandle(fd.rawValue))!, closeWhenDone: closeWhenDone) + #else + return IODescriptor(fd, closeWhenDone: closeWhenDone) + #endif +} + +private func createTerminationStatus(_ exitCode: UInt32) -> TerminationStatus { + #if canImport(WinSDK) + return .exited(exitCode) + #else + return .exited(Int32(exitCode)) + #endif +} + +private func createPipe() throws -> (readEnd: FileDescriptor, writeEnd: FileDescriptor) { + var createdPipe = try CreatedPipe(closeWhenDone: false, purpose: .output) + + #if canImport(WinSDK) + let readHandle = createdPipe.readFileDescriptor()!.platformDescriptor() + let writeHandle = createdPipe.writeFileDescriptor()!.platformDescriptor() + let readFd = _open_osfhandle( + intptr_t(bitPattern: readHandle), + FileDescriptor.AccessMode.readOnly.rawValue + ) + let writeFd = _open_osfhandle( + intptr_t(bitPattern: writeHandle), + FileDescriptor.AccessMode.writeOnly.rawValue + ) + #else + let readFd = createdPipe.readFileDescriptor()!.platformDescriptor() + let writeFd = createdPipe.writeFileDescriptor()!.platformDescriptor() + #endif + + return (readEnd: FileDescriptor(rawValue: readFd), writeEnd: FileDescriptor(rawValue: writeFd)) +} + +// MARK: - Internal Functions + +extension PipeConfiguration { + public func run() async throws -> CollectedResult { + guard stages.count > 1 else { + fatalError("Trivial pipeline with only a single stage isn't supported") + } + + // Pipeline - run with task group + return try await runPipeline() + } + + enum CollectedPipeResult { + case stderr(Error.OutputType) + case collectedResult(CollectedResult) + } + + /// Run the pipeline using withTaskGroup + private func runPipeline() async throws -> CollectedResult { + // Create a pipe for standard error + let sharedErrorPipe = try createPipe() + // FIXME: Use _safelyClose() to fully close each end of the pipe on all platforms + + return try await withThrowingTaskGroup(of: CollectedPipeResult.self, returning: CollectedResult.self) { group in + // Collect error output from all stages + group.addTask { + let errorReadFileDescriptor = createIODescriptor(from: sharedErrorPipe.readEnd, closeWhenDone: true) + let errorReadEnd = errorReadFileDescriptor.createIOChannel() + let stderr = try await self.error.captureOutput(from: errorReadEnd) + return .stderr(stderr) + } + + // Perform the main task of assembling the pipeline, I/O and exit code + group.addTask { + // Create pipes between stages + var pipes: [(readEnd: FileDescriptor, writeEnd: FileDescriptor)] = [] + for _ in 0..<(stages.count - 1) { + try pipes.append(createPipe()) + // FIXME: Use _safelyClose() to fully close each end of the pipe on all platforms + } + + let pipeResult = try await withThrowingTaskGroup(of: PipelineTaskResult.self, returning: CollectedResult.self) { group in + // First process + let firstStage = stages[0] + if stages.count > 1 { + let writeEnd = pipes[0].writeEnd + group.addTask { + do { + switch firstStage.stageType { + case .process(let configuration, let options): + var taskResult: PipelineTaskResult + + switch options.errorRedirection { + case .separate: + let originalResult = try await Subprocess.run( + configuration, + input: self.input, + output: .fileDescriptor(writeEnd, closeAfterSpawningProcess: true), + error: FileDescriptorOutput(fileDescriptor: sharedErrorPipe.writeEnd, closeAfterSpawningProcess: false) + ) + + taskResult = PipelineTaskResult.success( + 0, + SendableCollectedResult( + CollectedResult( + processIdentifier: originalResult.processIdentifier, + terminationStatus: originalResult.terminationStatus, + standardOutput: (), + standardError: () + ))) + case .mergeWithStdout: + let originalResult = try await Subprocess.run( + configuration, + input: self.input, + output: .fileDescriptor(writeEnd, closeAfterSpawningProcess: true), + error: .combineWithOutput + ) + + taskResult = PipelineTaskResult.success( + 0, + SendableCollectedResult( + CollectedResult( + processIdentifier: originalResult.processIdentifier, + terminationStatus: originalResult.terminationStatus, + standardOutput: (), + standardError: () + ))) + } + + return taskResult + case .swiftFunction(let function): + var inputPipe: CreatedPipe = try self.input.createPipe() + + let inputReadFileDescriptor: IODescriptor? = inputPipe.readFileDescriptor() + var inputWriteFileDescriptor: IODescriptor? = inputPipe.writeFileDescriptor() + + var inputReadEnd = inputReadFileDescriptor?.createIOChannel() + var inputWriteEnd: IOChannel? = inputWriteFileDescriptor.take()?.createIOChannel() + + let outputWriteFileDescriptor = createIODescriptor(from: writeEnd, closeWhenDone: true) + var outputWriteEnd: IOChannel? = outputWriteFileDescriptor.createIOChannel() + + // Use shared error pipe instead of discarded + let errorWriteFileDescriptor = createIODescriptor(from: sharedErrorPipe.writeEnd, closeWhenDone: false) + var errorWriteEnd: IOChannel? = errorWriteFileDescriptor.createIOChannel() + + let result = try await withThrowingTaskGroup(of: UInt32.self) { group in + let inputReadEnd = inputReadEnd.take()! + let outputWriteEnd = outputWriteEnd.take()! + let errorWriteEnd = errorWriteEnd.take()! + + var inputAsyncIO = false + if let inputWriteEnd = inputWriteEnd.take() { + inputAsyncIO = true + let writer = StandardInputWriter(diskIO: inputWriteEnd) + group.addTask { + try await self.input.write(with: writer) + try await writer.finish() + return 0 + } + } + + // FIXME figure out how to propagate a preferred buffer size to this sequence + let inSequence = AsyncBufferSequence(diskIO: inputReadEnd.consumeIOChannel(), preferredBufferSize: nil, isAsyncIO: inputAsyncIO) + let outWriter = StandardInputWriter(diskIO: outputWriteEnd) + let errWriter = StandardInputWriter(diskIO: errorWriteEnd) + + group.addTask { + do { + let retVal = try await function(inSequence, outWriter, errWriter) + + // Close outputs in case the function did not + try await outWriter.finish() + try await errWriter.finish() + + return retVal + } catch { + // Close outputs in case the function did not + try await outWriter.finish() + try await errWriter.finish() + throw error + } + } + + for try await t in group { + if t != 0 { + return t + } + } + + return 0 + } + + return PipelineTaskResult.success( + 0, + SendableCollectedResult( + CollectedResult( + processIdentifier: currentProcessIdentifier(), + terminationStatus: createTerminationStatus(result), + standardOutput: (), + standardError: () + ))) + } + } catch { + return PipelineTaskResult.failure(0, error) + } + } + } + + // Middle processes + for i in 1..<(stages.count - 1) { + let stage = stages[i] + let readEnd = pipes[i - 1].readEnd + let writeEnd = pipes[i].writeEnd + group.addTask { + do { + switch stage.stageType { + case .process(let configuration, let options): + var taskResult: PipelineTaskResult + switch options.errorRedirection { + case .separate: + let originalResult = try await Subprocess.run( + configuration, + input: .fileDescriptor(readEnd, closeAfterSpawningProcess: true), + output: .fileDescriptor(writeEnd, closeAfterSpawningProcess: true), + error: FileDescriptorOutput(fileDescriptor: sharedErrorPipe.writeEnd, closeAfterSpawningProcess: false) + ) + + taskResult = PipelineTaskResult.success( + i, + SendableCollectedResult( + CollectedResult( + processIdentifier: originalResult.processIdentifier, + terminationStatus: originalResult.terminationStatus, + standardOutput: (), + standardError: () + ))) + case .mergeWithStdout: + let originalResult = try await Subprocess.run( + configuration, + input: .fileDescriptor(readEnd, closeAfterSpawningProcess: true), + output: .fileDescriptor(writeEnd, closeAfterSpawningProcess: true), + error: .combineWithOutput + ) + + taskResult = PipelineTaskResult.success( + i, + SendableCollectedResult( + CollectedResult( + processIdentifier: originalResult.processIdentifier, + terminationStatus: originalResult.terminationStatus, + standardOutput: (), + standardError: () + ))) + } + + return taskResult + case .swiftFunction(let function): + let inputReadFileDescriptor = createIODescriptor(from: readEnd, closeWhenDone: true) + var inputReadEnd: IOChannel? = inputReadFileDescriptor.createIOChannel() + + let outputWriteFileDescriptor = createIODescriptor(from: writeEnd, closeWhenDone: true) + var outputWriteEnd: IOChannel? = outputWriteFileDescriptor.createIOChannel() + + // Use shared error pipe instead of discarded + let errorWriteFileDescriptor: IODescriptor = createIODescriptor(from: sharedErrorPipe.writeEnd, closeWhenDone: false) + var errorWriteEnd: IOChannel? = errorWriteFileDescriptor.createIOChannel() + + let result = try await withThrowingTaskGroup(of: UInt32.self) { group in + // FIXME figure out how to propagate a preferred buffer size to this sequence + let inSequence = AsyncBufferSequence(diskIO: inputReadEnd.take()!.consumeIOChannel(), preferredBufferSize: nil) + let outWriter = StandardInputWriter(diskIO: outputWriteEnd.take()!) + let errWriter = StandardInputWriter(diskIO: errorWriteEnd.take()!) + + group.addTask { + do { + let result = try await function(inSequence, outWriter, errWriter) + + // Close outputs in case the function did not + try await outWriter.finish() + try await errWriter.finish() + + return result + } catch { + // Close outputs in case the function did not + try await outWriter.finish() + try await errWriter.finish() + throw error + } + } + + for try await t in group { + if t != 0 { + return t + } + } + + return 0 + } + + return PipelineTaskResult.success( + i, + SendableCollectedResult( + CollectedResult( + processIdentifier: currentProcessIdentifier(), + terminationStatus: createTerminationStatus(result), + standardOutput: (), + standardError: () + ))) + } + } catch { + return PipelineTaskResult.failure(i, error) + } + } + } + + // Last process (if there are multiple stages) + if stages.count > 1 { + let lastIndex = stages.count - 1 + let lastStage = stages[lastIndex] + let readEnd = pipes[lastIndex - 1].readEnd + group.addTask { + do { + switch lastStage.stageType { + case .process(let configuration, let options): + switch options.errorRedirection { + case .separate: + let finalResult = try await Subprocess.run( + configuration, + input: .fileDescriptor(readEnd, closeAfterSpawningProcess: true), + output: self.output, + error: FileDescriptorOutput(fileDescriptor: sharedErrorPipe.writeEnd, closeAfterSpawningProcess: false) + ) + return PipelineTaskResult.success(lastIndex, SendableCollectedResult(finalResult)) + case .mergeWithStdout: + return PipelineTaskResult.success( + lastIndex, + SendableCollectedResult( + try await Subprocess.run( + configuration, + input: .fileDescriptor(readEnd, closeAfterSpawningProcess: true), + output: self.output, + error: .combineWithOutput + ))) + } + case .swiftFunction(let function): + let inputReadFileDescriptor = createIODescriptor(from: readEnd, closeWhenDone: true) + var inputReadEnd: IOChannel? = inputReadFileDescriptor.createIOChannel() + + var outputPipe = try self.output.createPipe() + let outputWriteFileDescriptor = outputPipe.writeFileDescriptor() + var outputWriteEnd: IOChannel? = outputWriteFileDescriptor?.createIOChannel() + + // Use shared error pipe instead of discarded + let errorWriteFileDescriptor = createIODescriptor(from: sharedErrorPipe.writeEnd, closeWhenDone: false) + var errorWriteEnd: IOChannel? = errorWriteFileDescriptor.createIOChannel() + + let result: (UInt32, Output.OutputType) = try await withThrowingTaskGroup(of: (UInt32, OutputCapturingState?).self) { group in + // FIXME figure out how to propagate a preferred buffer size to this sequence + let inSequence = AsyncBufferSequence(diskIO: inputReadEnd.take()!.consumeIOChannel(), preferredBufferSize: nil) + let outWriter = StandardInputWriter(diskIO: outputWriteEnd.take()!) + let errWriter = StandardInputWriter(diskIO: errorWriteEnd.take()!) + + let outputReadFileDescriptor = outputPipe.readFileDescriptor() + var outputReadEnd = outputReadFileDescriptor?.createIOChannel() + group.addTask { + let readEnd = outputReadEnd.take() + let stdout = try await self.output.captureOutput(from: readEnd) + return (0, .standardOutputCaptured(stdout)) + } + + group.addTask { + do { + let retVal = try await function(inSequence, outWriter, errWriter) + try await outWriter.finish() + try await errWriter.finish() + return (retVal, .none) + } catch { + try await outWriter.finish() + try await errWriter.finish() + throw error + } + } + + var exitCode: UInt32 = 0 + var output: Output.OutputType? = nil + for try await r in group { + if r.0 != 0 { + exitCode = r.0 + } + + if case (_, .standardOutputCaptured(let stdout)) = r { + output = stdout + } + } + + return (exitCode, output!) + } + + return PipelineTaskResult.success( + lastIndex, + SendableCollectedResult( + CollectedResult( + processIdentifier: currentProcessIdentifier(), + terminationStatus: createTerminationStatus(result.0), + standardOutput: result.1, + standardError: () + ))) + } + } catch { + return PipelineTaskResult.failure(lastIndex, error) + } + } + } + + // Collect all results + var errors: [Swift.Error] = [] + var lastStageResult: SendableCollectedResult? + + for try await result in group { + switch result { + case .success(let index, let collectedResult): + if index == stages.count - 1 { + // This is the final stage result we want to return + lastStageResult = collectedResult + } + case .failure(_, let error): + errors.append(error) + } + } + + // Close the shared error pipe now that all processes have finished so that + // the standard error can be collected. + try sharedErrorPipe.writeEnd.close() + + if !errors.isEmpty { + throw errors[0] // Throw the first error + } + + guard let lastResult = lastStageResult else { + throw SubprocessError(code: .init(.asyncIOFailed("Pipeline execution failed")), underlyingError: nil) + } + + // Create a properly typed CollectedResult from the SendableCollectedResult with shared error + return CollectedResult( + processIdentifier: lastResult.processIdentifier, + terminationStatus: lastResult.terminationStatus, + standardOutput: lastResult.standardOutput as! Output.OutputType, + standardError: () + ) + } + + return .collectedResult(pipeResult) + } + + var stderr: Error.OutputType? + var collectedResult: CollectedResult? + + for try await result in group { + switch result { + case .collectedResult(let pipeResult): + collectedResult = pipeResult + case .stderr(let err): + stderr = err + } + } + + return CollectedResult( + processIdentifier: collectedResult!.processIdentifier, + terminationStatus: collectedResult!.terminationStatus, + standardOutput: collectedResult!.standardOutput, + standardError: stderr! + ) + } + } +} + +// MARK: - Top-Level Pipe Functions (Return Stage Arrays) + +/// Create a single-stage pipeline with an executable +public func pipe( + _ executable: Executable, + arguments: Arguments = [], + environment: Environment = .inherit, + workingDirectory: FilePath? = nil, + platformOptions: PlatformOptions = PlatformOptions(), + options: ProcessStageOptions = .default +) -> [PipeStage] { + return [ + PipeStage( + executable, + arguments: arguments, + environment: environment, + workingDirectory: workingDirectory, + platformOptions: platformOptions, + options: options + ) + ] +} + +/// Create a single-stage pipeline with a Configuration +public func pipe( + _ configuration: Configuration, + options: ProcessStageOptions = .default +) -> [PipeStage] { + return [PipeStage(configuration: configuration, options: options)] +} + +/// Create a single-stage pipeline with a Swift function +public func pipe( + _ swiftFunction: @escaping @Sendable (AsyncBufferSequence, StandardInputWriter, StandardInputWriter) async throws -> UInt32 +) -> [PipeStage] { + return [PipeStage(swiftFunction: swiftFunction)] +} + +// MARK: - Stage Array Operators + +/// Pipe operator for stage arrays - adds a process stage +public func | ( + left: [PipeStage], + right: PipeStage +) -> [PipeStage] { + return left + [right] +} + +/// Pipe operator for stage arrays with Configuration +public func | ( + left: [PipeStage], + right: Configuration +) -> [PipeStage] { + return left + [PipeStage(configuration: right, options: .default)] +} + +/// Pipe operator for stage arrays with simple executable +public func | ( + left: [PipeStage], + right: Executable +) -> [PipeStage] { + let configuration = Configuration(executable: right) + return left + [PipeStage(configuration: configuration, options: .default)] +} + +/// Pipe operator for stage arrays with tuple for executable and arguments +public func | ( + left: [PipeStage], + right: (Executable, arguments: Arguments) +) -> [PipeStage] { + let configuration = Configuration(executable: right.0, arguments: right.arguments) + return left + [PipeStage(configuration: configuration, options: .default)] +} + +/// Pipe operator for stage arrays with tuple for executable, arguments, and process stage options +public func | ( + left: [PipeStage], + right: (Executable, arguments: Arguments, options: ProcessStageOptions) +) -> [PipeStage] { + let configuration = Configuration(executable: right.0, arguments: right.arguments) + return left + [PipeStage(configuration: configuration, options: right.options)] +} + +/// Pipe operator for stage arrays with Swift function +public func | ( + left: [PipeStage], + right: @escaping @Sendable (AsyncBufferSequence, StandardInputWriter, StandardInputWriter) async throws -> UInt32 +) -> [PipeStage] { + return left + [PipeStage(swiftFunction: right)] +} + +/// Pipe operator for stage arrays with process helper +public func | ( + left: [PipeStage], + right: (configuration: Configuration, options: ProcessStageOptions) +) -> [PipeStage] { + return left + [PipeStage(configuration: right.configuration, options: right.options)] +} + +// MARK: - Finally Methods for Stage Arrays (Extension) + +extension Array where Element == PipeStage { + /// Add a new stage to the pipeline with executable and arguments + public func stage( + _ executable: Executable, + arguments: Arguments = [], + environment: Environment = .inherit, + workingDirectory: FilePath? = nil, + platformOptions: PlatformOptions = PlatformOptions(), + options: ProcessStageOptions = .default + ) -> [PipeStage] { + return self + [ + PipeStage( + executable, + arguments: arguments, + environment: environment, + workingDirectory: workingDirectory, + platformOptions: platformOptions, + options: options + ) + ] + } + + /// Add a new stage to the pipeline with a Configuration + public func stage( + _ configuration: Configuration, + options: ProcessStageOptions = .default + ) -> [PipeStage] { + return self + [PipeStage(configuration: configuration, options: options)] + } + + /// Add a new stage to the pipeline with a Swift function + public func stage( + _ swiftFunction: @escaping @Sendable (AsyncBufferSequence, StandardInputWriter, StandardInputWriter) async throws -> UInt32 + ) -> [PipeStage] { + return self + [PipeStage(swiftFunction: swiftFunction)] + } + + /// Add a new stage to the pipeline with a configuration and options + public func stage( + configuration: Configuration, + options: ProcessStageOptions + ) -> [PipeStage] { + return self + [PipeStage(configuration: configuration, options: options)] + } + + /// Create a PipeConfiguration from stages with specific input, output, and error types + public func finally( + input: FinalInput, + output: FinalOutput, + error: FinalError + ) -> PipeConfiguration { + return PipeConfiguration( + stages: self, + input: input, + output: output, + error: error + ) + } + + /// Create a PipeConfiguration from stages with no input and specific output and error types + public func finally( + output: FinalOutput, + error: FinalError + ) -> PipeConfiguration { + return self.finally(input: NoInput(), output: output, error: error) + } + + /// Create a PipeConfiguration from stages with no input, specific output, and discarded error + public func finally( + output: FinalOutput + ) -> PipeConfiguration { + return self.finally(input: NoInput(), output: output, error: DiscardedOutput()) + } +} + +/// Final pipe operator for stage arrays with specific input, output and error types +public func |> ( + left: [PipeStage], + right: (input: FinalInput, output: FinalOutput, error: FinalError) +) -> PipeConfiguration { + return left.finally(input: right.input, output: right.output, error: right.error) +} + +/// Final pipe operator for stage arrays with specific output and error types +public func |> ( + left: [PipeStage], + right: (output: FinalOutput, error: FinalError) +) -> PipeConfiguration { + return left.finally(output: right.output, error: right.error) +} + +/// Final pipe operator for stage arrays with specific output only (discarded error) +public func |> ( + left: [PipeStage], + right: FinalOutput +) -> PipeConfiguration { + return left.finally(output: right) +} + +// MARK: - Helper Functions diff --git a/Tests/SubprocessTests/PipeConfigurationTests.swift b/Tests/SubprocessTests/PipeConfigurationTests.swift new file mode 100644 index 0000000..21d187b --- /dev/null +++ b/Tests/SubprocessTests/PipeConfigurationTests.swift @@ -0,0 +1,1471 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if canImport(System) +@preconcurrency import System +#else +@preconcurrency import SystemPackage +#endif + +import Foundation +import Testing +@testable import Subprocess + +protocol Configurable { + var configuration: Configuration { get } +} + +func pipe( + _ configurable: any Configurable, + options: ProcessStageOptions = .default +) -> [PipeStage] { + return [PipeStage(configuration: configurable.configuration, options: options)] +} + +extension [PipeStage] { + func stage( + _ configurable: any Configurable, + options: ProcessStageOptions = .default + ) -> [PipeStage] { + return self.stage(configurable.configuration, options: options) + } +} + +/// Pipe operator for stage arrays with Configuration +func | ( + left: [PipeStage], + right: Configurable +) -> [PipeStage] { + return left + [PipeStage(configuration: right.configuration, options: .default)] +} + +// MARK: - Cross-Platform Command Abstractions + +/// Cross-platform echo command abstraction +struct Echo: Configurable { + let message: String + + init(_ message: String) { + self.message = message + } + + var configuration: Configuration { + #if os(Windows) + return Configuration( + executable: .name("powershell.exe"), + arguments: Arguments(["-Command", "Write-Host '\(message)' -NoNewline"]) + ) + #else + return Configuration( + executable: .name("echo"), + arguments: Arguments([message]) + ) + #endif + } +} + +/// Cross-platform cat command abstraction +struct Cat: Configurable { + let arguments: [String] + + init(_ arguments: String...) { + self.arguments = arguments + } + + var configuration: Configuration { + #if os(Windows) + return Configuration( + executable: .name("cmd.exe"), + arguments: Arguments(["/c", "findstr x*"]) + ) + #else + return Configuration( + executable: .name("cat"), + arguments: Arguments(arguments) + ) + #endif + } +} + +/// Cross-platform wc command abstraction +struct Wc: Configurable { + let options: [String] + + init(_ options: String...) { + self.options = options + } + + var configuration: Configuration { + #if os(Windows) + // Windows doesn't have wc, use PowerShell for basic counting + if options.contains("-l") { + return Configuration( + executable: .name("powershell.exe"), + arguments: Arguments(["-Command", "($input | Measure-Object -Line).Lines"]) + ) + } else if options.contains("-w") { + return Configuration( + executable: .name("powershell.exe"), + arguments: Arguments(["-Command", "($input | Measure-Object -Word).Words"]) + ) + } else if options.contains("-c") { + return Configuration( + executable: .name("powershell.exe"), + arguments: Arguments(["-Command", "($input | Measure-Object -Character).Characters"]) + ) + } else { + return Configuration( + executable: .name("powershell.exe"), + arguments: Arguments(["-Command", "$input | Measure-Object -Line -Word -Character"]) + ) + } + #else + return Configuration( + executable: .name("wc"), + arguments: Arguments(options) + ) + #endif + } +} + +/// Cross-platform sort command abstraction +struct Sort: Configurable { + let options: [String] + + init(_ options: String...) { + self.options = options + } + + var configuration: Configuration { + #if os(Windows) + return Configuration( + executable: .name("sort"), + arguments: Arguments(options) + ) + #else + return Configuration( + executable: .name("sort"), + arguments: Arguments(options) + ) + #endif + } +} + +/// Cross-platform head command abstraction +struct Head: Configurable { + let options: [String] + + init(_ options: String...) { + self.options = options + } + + var configuration: Configuration { + #if os(Windows) + if let countOption = options.first, countOption.hasPrefix("-") { + let count = String(countOption.dropFirst()) + return Configuration( + executable: .name("powershell.exe"), + arguments: Arguments(["-Command", "$input | Select-Object -First \(count)"]) + ) + } else { + return Configuration( + executable: .name("powershell.exe"), + arguments: Arguments(["-Command", "$input | Select-Object -First 10"]) + ) + } + #else + return Configuration( + executable: .name("head"), + arguments: Arguments(options) + ) + #endif + } +} + +/// Cross-platform grep command abstraction +struct Grep: Configurable { + let pattern: String + let options: [String] + + init(_ pattern: String, options: String...) { + self.pattern = pattern + self.options = options + } + + var configuration: Configuration { + #if os(Windows) + return Configuration( + executable: .name("findstr"), + arguments: Arguments([pattern] + options) + ) + #else + return Configuration( + executable: .name("grep"), + arguments: Arguments([pattern] + options) + ) + #endif + } +} + +/// Cross-platform shell command abstraction +struct Shell: Configurable { + let command: String + + init(_ command: String) { + self.command = command + } + + var configuration: Configuration { + #if os(Windows) + return Configuration( + executable: .name("cmd.exe"), + arguments: Arguments(["/c", command]) + ) + #else + return Configuration( + executable: .name("sh"), + arguments: Arguments(["-c", command]) + ) + #endif + } +} + +@Suite(.serialized) +struct PipeConfigurationTests { + + // MARK: - Basic PipeConfiguration Tests + + @Test func testBasicPipeConfiguration() async throws { + let config = pipe( + Echo("Hello World") + ).stage( + Cat() + ).finally( + output: .string(limit: .max), + error: .discarded + ) + + let result = try await config.run() + #expect(result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) == "Hello World") + #expect(result.terminationStatus.isSuccess) + } + + @Test func testBasicSwiftFunctionBeginning() async throws { + let config = + pipe { input, output, error in + var foundHello = false + for try await line in input.lines() { + if line.hasPrefix("Hello") { + foundHello = true + } + } + + guard foundHello else { + return 1 + } + + let written = try await output.write("Hello World") + guard written == "Hello World".utf8.count else { + return 1 + } + return 0 + } | Cat() + |> ( + input: .string("Hello"), + output: .string(limit: .max), + error: .string(limit: .max) + ) + + let result = try await config.run() + #expect(result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) == "Hello World") + #expect(result.terminationStatus.isSuccess) + } + + @Test func testBasicSwiftFunctionMiddle() async throws { + let config = + pipe( + Echo("Hello") + ) | { input, output, error in + var foundHello = false + for try await line in input.lines() { + if line.hasPrefix("Hello") { + foundHello = true + } + } + + guard foundHello else { + return 1 + } + + let written = try await output.write("Hello World") + guard written == "Hello World".utf8.count else { + return 1 + } + return 0 + } | Cat() + |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + let result = try await config.run() + #expect(result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) == "Hello World") + #expect(result.terminationStatus.isSuccess) + } + + @Test func testBasicSwiftFunctionEnd() async throws { + let config = + pipe( + Echo("Hello") + ) | { input, output, error in + var foundHello = false + for try await line in input.lines() { + if line.hasPrefix("Hello") { + foundHello = true + } + } + + guard foundHello else { + return 1 + } + + let written = try await output.write("Hello World") + guard written == "Hello World".utf8.count else { + return 1 + } + return 0 + } |> .string(limit: .max) + + let result = try await config.run() + #expect(result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) == "Hello World") + #expect(result.terminationStatus.isSuccess) + } + + @Test func testPipeConfigurationWithConfiguration() async throws { + let processConfig = + pipe( + Echo("Test Message") + ) | Cat() |> .string(limit: .max) + + let result = try await processConfig.run() + #expect(result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) == "Test Message") + #expect(result.terminationStatus.isSuccess) + } + + // MARK: - Pipe Method Tests + + @Test func testPipeMethod() async throws { + let pipeline = + pipe( + Echo("line1\nline2\nline3") + ) + | Wc("-l") + |> .string(limit: .max) + + let result = try await pipeline.run() + let lineCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(lineCount == "3") + #expect(result.terminationStatus.isSuccess) + } + + @Test func testPipeMethodWithConfiguration() async throws { + let pipeline = + pipe( + Echo("apple\nbanana\ncherry") + ) | Wc("-l") |> .string(limit: .max) + + let result = try await pipeline.run() + let lineCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(lineCount == "3") + #expect(result.terminationStatus.isSuccess) + } + + // MARK: - Pipe Operator Tests + + @Test func testBasicPipeOperator() async throws { + let pipeline = + pipe( + Echo("Hello\nWorld\nTest") + ) | Wc() + | Cat() + |> .string(limit: .max) + + let result = try await pipeline.run() + // wc output should contain line count + #expect(result.standardOutput?.contains("3") == true) + #expect(result.terminationStatus.isSuccess) + } + + @Test func testPipeOperatorWithExecutableOnly() async throws { + let pipeline = + pipe( + Echo("single line") + ) | Cat() // Simple pass-through + | Wc("-c") // Count characters + |> .string(limit: .max) + + let result = try await pipeline.run() + // Should count characters in "single line\n" (12 characters) + let charCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(charCount == "12" || charCount == "11") // Variation depending on the platform + #expect(result.terminationStatus.isSuccess) + } + + @Test func testPipeOperatorWithConfiguration() async throws { + let catConfig = Cat() + + let pipeline = + pipe( + Echo("test data") + ) | catConfig + | Wc("-w") // Count words + |> .string(limit: .max) + + let result = try await pipeline.run() + let wordCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(wordCount == "2") // "test data" = 2 words + #expect(result.terminationStatus.isSuccess) + } + + @Test func testPipeOperatorWithProcessHelper() async throws { + let pipeline = + pipe( + Echo( + """ + apple + banana + cherry + date + """ + ) + ) + | Head("-3") + | Wc("-l") + |> .string(limit: .max) + + let result = try await pipeline.run() + let lineCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(lineCount == "3") + #expect(result.terminationStatus.isSuccess) + } + + // MARK: - Complex Pipeline Tests + + @Test func testComplexPipeline() async throws { + let pipeline = + pipe( + Echo( + """ + zebra + apple + banana + cherry + """ + ) + ) + | Sort() + | Head() // Take first few lines (default) + | Wc("-l") + |> .string(limit: .max) + + let result = try await pipeline.run() + // Should have some lines (exact count depends on head default) + let lineCount = Int(result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) ?? "0") ?? 0 + #expect(lineCount > 0) + #expect(result.terminationStatus.isSuccess) + } + + // MARK: - Input Type Tests + + @Test func testPipelineWithStringInput() async throws { + let pipeline = + pipe( + Cat() + ) + | Wc("-w") // Count words + |> ( + input: .string("Hello world from string input"), + output: .string(limit: .max), + error: .discarded + ) + + let result = try await pipeline.run() + let wordCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(wordCount == "5") // "Hello world from string input" = 5 words + #expect(result.terminationStatus.isSuccess) + } + + @Test func testPipelineWithStringInputAndSwiftFunction() async throws { + let pipeline = + pipe( + { input, output, err in + var wordCount = 0 + for try await line in input.lines() { + let words = line.split(separator: " ") + wordCount += words.count + } + + let countString = "Word count: \(wordCount)" + let written = try await output.write(countString) + return written > 0 ? 0 : 1 + } + ) | Cat() + |> ( + input: .string("Swift functions can process string input efficiently"), + output: .string(limit: .max), + error: .string(limit: .max) + ) + + let result = try await pipeline.run() + #expect(result.standardOutput?.contains("Word count: 7") == true) + #expect(result.terminationStatus.isSuccess) + } + + @Test func testSwiftFunctionAsFirstStageWithStringInput() async throws { + let pipeline = + pipe( + { input, output, err in + // Convert input to uppercase and add line numbers + var lineNumber = 1 + for try await line in input.lines() { + let uppercaseLine = "\(lineNumber): \(line.uppercased())\n" + _ = try await output.write(uppercaseLine) + lineNumber += 1 + } + return 0 + } + ) | Cat() // Use cat instead of head to see all output + |> ( + input: .string("first line\nsecond line\nthird line"), + output: .string(limit: .max), + error: .discarded + ) + + let result = try await pipeline.run() + let output = result.standardOutput ?? "" + #expect(output.contains("1: FIRST LINE")) + #expect(output.contains("2: SECOND LINE")) + #expect(output.contains("3: THIRD LINE")) + #expect(result.terminationStatus.isSuccess) + } + + @Test func testProcessStageWithFileDescriptorInput() async throws { + // Create a temporary file with test content + let tempURL = FileManager.default.temporaryDirectory.appendingPathComponent("pipe_test_\(UUID().uuidString).txt") + let testContent = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5" + try testContent.write(to: tempURL, atomically: true, encoding: .utf8) + + defer { + try? FileManager.default.removeItem(at: tempURL) + } + + // Open file descriptor for reading + let fileDescriptor = try FileDescriptor.open(FilePath(tempURL.path), .readOnly) + defer { + try? fileDescriptor.close() + } + + let pipeline = + pipe( + Head("-3") + ) + | Wc("-l") + |> ( + input: .fileDescriptor(fileDescriptor, closeAfterSpawningProcess: false), + output: .string(limit: .max), + error: .discarded + ) + + let result = try await pipeline.run() + let lineCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(lineCount == "3") // head -3 should give us 3 lines + #expect(result.terminationStatus.isSuccess) + } + + @Test func testSwiftFunctionWithFileDescriptorInput() async throws { + // Create a temporary file with JSON content + let tempURL = FileManager.default.temporaryDirectory.appendingPathComponent("json_test_\(UUID().uuidString).json") + let jsonContent = #"{"name": "Alice", "age": 30, "city": "New York"}"# + try jsonContent.write(to: tempURL, atomically: true, encoding: .utf8) + + defer { + try? FileManager.default.removeItem(at: tempURL) + } + + // Open file descriptor for reading + let fileDescriptor = try FileDescriptor.open(FilePath(tempURL.path), .readOnly) + defer { + try? fileDescriptor.close() + } + + struct Person: Codable { + let name: String + let age: Int + let city: String + } + + let pipeline = + pipe( + { input, output, err in + var jsonData = Data() + for try await chunk in input.lines() { + jsonData.append(contentsOf: chunk.utf8) + } + + do { + let decoder = JSONDecoder() + let person = try decoder.decode(Person.self, from: jsonData) + let summary = "Person: \(person.name), Age: \(person.age), Location: \(person.city)" + let written = try await output.write(summary) + return written > 0 ? 0 : 1 + } catch { + _ = try await err.write("JSON parsing failed: \(error)") + return 1 + } + } + ) | Cat() // Add second stage to make it a valid pipeline + |> ( + input: .fileDescriptor(fileDescriptor, closeAfterSpawningProcess: false), + output: .string(limit: .max), + error: .string(limit: .max) + ) + + let result = try await pipeline.run() + #expect(result.standardOutput?.contains("Person: Alice, Age: 30, Location: New York") == true) + #expect(result.terminationStatus.isSuccess) + } + + @Test func testComplexPipelineWithStringInputAndSwiftFunction() async throws { + let csvData = "name,score,grade\nAlice,95,A\nBob,87,B\nCharlie,92,A\nDave,78,C" + + let pipeline = + pipe( + { input, output, err in + // Parse CSV and filter for A grades + var lineCount = 0 + for try await line in input.lines() { + lineCount += 1 + let trimmedLine = line.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + + // Skip header line + if lineCount == 1 { + continue + } + + let components = trimmedLine.split(separator: ",").map { String($0) } + if components.count >= 3 && components[2] == "A" { + let name = components[0] + let score = components[1] + _ = try await output.write("\(name): \(score)\n") + } + } + return 0 + } + ) | Cat() + |> ( + input: .string(csvData), + output: .string(limit: .max), + error: .string(limit: .max) + ) + + let result = try await pipeline.run() + let output = result.standardOutput ?? "" + #expect(output.contains("Alice: 95")) + #expect(output.contains("Charlie: 92")) + #expect(!output.contains("Bob")) // Bob has grade B, should be filtered out + #expect(!output.contains("Dave")) // Dave has grade C, should be filtered out + #expect(result.terminationStatus.isSuccess) + } + + @Test func testMultiStageSwiftFunctionPipelineWithStringInput() async throws { + let numbers = "10\n25\n7\n42\n13\n8\n99" + + let pipeline = + pipe( + { input, output, err in + // First Swift function: filter for numbers > 10 + for try await line in input.lines() { + let trimmed = line.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + if !trimmed.isEmpty, let number = Int(trimmed), number > 10 { + _ = try await output.write("\(number)\n") + } + } + return 0 + } + ) | { input, output, err in + // Second Swift function: double the numbers + for try await line in input.lines() { + let trimmed = line.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + if !trimmed.isEmpty, let number = Int(trimmed) { + let doubled = number * 2 + _ = try await output.write("\(doubled)\n") + } + } + return 0 + } + | Cat() + |> ( + input: .string(numbers), + output: .string(limit: .max), + error: .string(limit: .max) + ) + + let result = try await pipeline.run() + let output = result.standardOutput ?? "" + let lines = output.split(separator: "\n").compactMap { line in + let trimmed = line.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + return trimmed.isEmpty ? nil : Int(trimmed) + } + + // Input: 10, 25, 7, 42, 13, 8, 99 + // After filter (> 10): 25, 42, 13, 99 + // After doubling: 50, 84, 26, 198 + #expect(lines.contains(50)) // 25 * 2 + #expect(lines.contains(84)) // 42 * 2 + #expect(lines.contains(26)) // 13 * 2 + #expect(lines.contains(198)) // 99 * 2 + + // These should NOT be present (filtered out) + #expect(!lines.contains(20)) // 10 * 2 (10 not > 10) + #expect(!lines.contains(14)) // 7 * 2 (7 <= 10) + #expect(!lines.contains(16)) // 8 * 2 (8 <= 10) + + #expect(result.terminationStatus.isSuccess) + } + + // MARK: - Shared Error Handling Tests + + @Test func testSharedErrorHandlingInPipeline() async throws { + // FIXME - There is a race condition here that truncates the stderr on both Linux and macOS - The sleep helps to mitigate + let pipeline = + pipe( + Shell("echo 'first stdout'; echo 'first stderr' >&2; sleep 1") + ) + | Shell("echo 'second stdout'; echo 'second stderr' >&2; sleep 1") + |> ( + output: .string(limit: .max), + error: .string(limit: 1024), + ) + + let result = try await pipeline.run() + let errorOutput = result.standardError ?? "" + + // Both stages should contribute to shared stderr + #expect(errorOutput.contains("first stderr")) + #expect(errorOutput.contains("second stderr")) + #expect(result.terminationStatus.isSuccess) + } + + @Test func testSharedErrorHandlingWithSwiftFunction() async throws { + #if os(Windows) + let pipeline = + pipe( + { input, output, err in + _ = try await err.write("Swift function error\n") + return 0 + } + ) + | ( + .name("powershell.exe"), + arguments: Arguments(["-Command", "'shell stdout'; [Console]::Error.WriteLine('shell stderr')"]) + ) |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + #else + let pipeline = + pipe( + { input, output, err in + _ = try await err.write("Swift function error\n") + return 0 + } + ) + | ( + .name("sh"), + arguments: ["-c", "echo 'shell stdout'; echo 'shell stderr' >&2"] + ) |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + #endif + + let result = try await pipeline.run() + let errorOutput = result.standardError ?? "" + + // Both Swift function and shell process should contribute to stderr + #expect(errorOutput.contains("Swift function error")) + #expect(errorOutput.contains("shell stderr")) + #expect(result.terminationStatus.isSuccess) + } + + @Test func testSharedErrorRespectingMaxSize() async throws { + let longErrorMessage = String(repeating: "error", count: 100) // 500 characters + + let pipeline = + pipe( + .name("sh"), + arguments: ["-c", "echo '\(longErrorMessage)' >&2"] + ) + | ( + .name("sh"), + arguments: ["-c", "echo '\(longErrorMessage)' >&2"] + ) |> ( + output: .string(limit: .max), + error: .string(limit: 100) // Limit error to 100 bytes + ) + + await #expect(throws: SubprocessError.self) { + try await pipeline.run() + } + } + + // MARK: - Error Redirection Tests + + @Test func testSeparateErrorRedirection() async throws { + #if os(Windows) + let config = + pipe( + { input, output, err in + _ = try await err.write("Swift function error\n") + return 0 + } + ) + | ( + .name("powershell.exe"), + arguments: Arguments(["-Command", "'shell stdout'; [Console]::Error.WriteLine('shell stderr')"]), + options: .default + ) |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + #else + let config = + pipe( + { input, output, err in + _ = try await err.write("Swift function error\n") + return 0 + } + ) + | ( + .name("sh"), + arguments: ["-c", "echo 'shell stdout'; echo 'shell stderr' >&2"], + options: .default + ) |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + #endif + + let result = try await config.run() + #expect(result.standardOutput?.contains("stdout") == true) + #expect(result.standardError?.contains("stderr") == true) + #expect(result.terminationStatus.isSuccess) + } + + // FIXME these tend to cause hangs on Windows in CI + #if !os(Windows) + @Test func testMergeErrorRedirection() async throws { + #if os(Windows) + let config = + pipe( + .name("powershell.exe"), + arguments: Arguments(["-Command", "'shell stdout'; [Console]::Error.WriteLine('shell stderr')"]), + options: .mergeErrors + ) | Grep("shell") |> ( + output: .string(limit: .max), + error: .discarded + ) + #else + let config = + pipe( + .name("sh"), + arguments: ["-c", "echo 'shell stdout'; echo 'shell stderr' >&2"], + options: .mergeErrors + ) | Grep("shell") |> ( + output: .string(limit: .max), + error: .discarded + ) + #endif + + let result = try await config.run() + // With merge, both stdout and stderr content should appear in the output stream + // Since both streams are directed to the same destination (.output), + // the merged content should appear in standardOutput + #expect(result.standardOutput?.contains("stdout") == true) + #expect(result.standardOutput?.contains("stderr") == true) + #expect(result.terminationStatus.isSuccess) + } + + @Test func testErrorRedirectionWithPipeOperators() async throws { + #if os(Windows) + let pipeline = + pipe( + .name("powershell.exe"), + arguments: Arguments(["-Command", "'line1'; [Console]::Error.WriteLine('error1')"]), + options: .mergeErrors // Merge stderr into stdout + ) + | Grep("error") + | Wc("-l") + |> ( + output: .string(limit: .max), + error: .discarded + ) + #else + let pipeline = + pipe( + .name("sh"), + arguments: ["-c", "echo 'line1'; echo 'error1' >&2"], + options: .mergeErrors // Merge stderr into stdout + ) + | Grep("error") + | Wc("-l") + |> ( + output: .string(limit: .max), + error: .discarded + ) + #endif + + let result = try await pipeline.run() + // Should find the error line that was merged into stdout + let lineCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(lineCount == "1") + #expect(result.terminationStatus.isSuccess) + } + #endif + + @Test func testProcessHelperWithErrorRedirection() async throws { + let pipeline = + pipe(Echo("data")) + | Cat() // Simple passthrough, no error redirection needed + | Wc("-c") + |> .string(limit: .max) + + let result = try await pipeline.run() + // Should count characters in "data\n" (5 characters) + let charCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(charCount == "5" || charCount == "4") // Slight difference in character count between platforms + #expect(result.terminationStatus.isSuccess) + } + + // MARK: - Error Handling Tests + + @Test func testPipelineErrorHandling() async throws { + // Create a pipeline where one command will fail + let pipeline = + pipe(Echo("test")) + | .name("nonexistent-command") // This should fail + | Cat() |> .string(limit: .max) + + await #expect(throws: (any Error).self) { + _ = try await pipeline.run() + } + } + + // MARK: - String Interpolation and Description Tests + + @Test func testPipeConfigurationDescription() { + let config = pipe( + .name("echo"), + arguments: ["echo"] + ).finally( + output: .string(limit: .max) + ) + + let description = config.description + #expect(description.contains("PipeConfiguration")) + #expect(description.contains("echo")) + } + + @Test func testPipelineDescription() { + let pipeline = + pipe(Echo("test")) + | Cat() + | Wc() + |> .string(limit: .max) + + let description = pipeline.description + #expect(description.contains("Pipeline with")) + #expect(description.contains("stages")) + } + + // MARK: - Helper Function Tests + + @Test func testFinallyHelper() async throws { + let pipeline = + pipe(Echo("helper test")) | Cat() |> .string(limit: .max) + + let result = try await pipeline.run() + #expect(result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) == "helper test") + #expect(result.terminationStatus.isSuccess) + } + + @Test func testProcessHelper() async throws { + let pipeline = + pipe(Echo("process helper test")) + | Cat() + | Wc("-c") + |> .string(limit: .max) + + let result = try await pipeline.run() + // "process helper test\n" should be 20 characters + let charCount = result.standardOutput?.trimmingCharacters(in: CharacterSet.whitespacesAndNewlines) + #expect(charCount == "20" || charCount == "19") // Slight difference in character counts between platforms + #expect(result.terminationStatus.isSuccess) + } + + // MARK: - Swift Lambda Tests (Compilation Only) + + // Note: Full Swift lambda execution tests are omitted for now due to generic type inference complexity + // The Swift lambda functionality is implemented and working, as demonstrated by the successful + // testMergeErrorRedirection test which uses Swift lambda internally for cross-platform error merging + + // MARK: - Swift Function Tests (Compilation Only) + + // Note: These tests verify that the Swift function APIs compile correctly + // Full execution tests are complex due to buffer handling and are omitted for now + + // MARK: - JSON Processing with Swift Functions + + @Test func testJSONEncodingPipeline() async throws { + struct Person: Codable { + let name: String + let age: Int + } + + let people = [ + Person(name: "Alice", age: 30), + Person(name: "Bob", age: 25), + Person(name: "Charlie", age: 35), + ] + + let pipeline = + pipe( + { input, output, err in + // Encode array of Person objects to JSON + let encoder = JSONEncoder() + encoder.outputFormatting = .prettyPrinted + + do { + let jsonData = try encoder.encode(people) + let jsonString = String(data: jsonData, encoding: .utf8) ?? "" + let written = try await output.write(jsonString) + return written > 0 ? 0 : 1 + } catch { + _ = try await err.write("JSON encoding failed: \(error)") + return 1 + } + } + ) + | ( + .name("jq"), + arguments: [".[] | select(.age > 28)"] // Filter people over 28 + ) |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + // This test is for compilation only - would need jq installed to run + #expect(pipeline.stages.count == 2) + } + + @Test func testJSONDecodingPipeline() async throws { + struct User: Codable { + let id: Int + let username: String + let email: String + } + + let usersJson = #"[{"id": 1, "username": "alice", "email": "alice@example.com"}, {"id": 2, "username": "bob", "email": "bob@example.com"}, {"id": 3, "username": "charlie", "email": "charlie@example.com"}, {"id": 6, "username": "dave", "email": "dave@example.com"}]"# + + let pipeline = + pipe( + .name("echo"), + arguments: [usersJson] + ) | { input, output, err in + // Read JSON and decode to User objects + var jsonData = Data() + + for try await chunk in input.lines() { + jsonData.append(contentsOf: chunk.utf8) + } + + do { + let decoder = JSONDecoder() + let users = try decoder.decode([User].self, from: jsonData) + + // Filter and transform users + let filteredUsers = users.filter { $0.id <= 5 } + let usernames = filteredUsers.map { $0.username }.joined(separator: "\n") + + let written = try await output.write(usernames) + return written > 0 ? 0 : 1 + } catch { + _ = try await err.write("JSON decoding failed: \(error)") + return 1 + } + } | .name("sort") + |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + // This test is for compilation only + #expect(pipeline.stages.count == 3) + } + + @Test func testJSONTransformationPipeline() async throws { + struct InputData: Codable { + let items: [String] + let metadata: [String: String] + } + + struct OutputData: Codable { + let processedItems: [String] + let itemCount: Int + let processingDate: String + } + + let pipeline = + pipe( + .name("echo"), + arguments: [#"{"items": ["apple", "banana", "cherry"], "metadata": {"source": "test"}}"#] + ) | { input, output, err in + // Transform JSON structure + var jsonData = Data() + + for try await chunk in input.lines() { + jsonData.append(contentsOf: chunk.utf8) + } + + do { + let decoder = JSONDecoder() + let inputData = try decoder.decode(InputData.self, from: jsonData) + + let outputData = OutputData( + processedItems: inputData.items.map { $0.uppercased() }, + itemCount: inputData.items.count, + processingDate: ISO8601DateFormatter().string(from: Date()) + ) + + let encoder = JSONEncoder() + encoder.outputFormatting = .prettyPrinted + let outputJson = try encoder.encode(outputData) + let jsonString = String(data: outputJson, encoding: .utf8) ?? "" + + let written = try await output.write(jsonString) + return written > 0 ? 0 : 1 + } catch { + _ = try await err.write("JSON transformation failed: \(error)") + return 1 + } + } |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + // This test is for compilation only + #expect(pipeline.stages.count == 2) + } + + @Test func testJSONStreamProcessing() async throws { + struct LogEntry: Codable { + let timestamp: String + let level: String + let message: String + } + + let pipeline = + pipe( + .name("tail"), + arguments: ["-f", "/var/log/app.log"] + ) | { input, output, error in + // Process JSON log entries line by line + for try await line in input.lines() { + guard !line.isEmpty else { continue } + + do { + let decoder = JSONDecoder() + let logEntry = try decoder.decode(LogEntry.self, from: line.data(using: .utf8) ?? Data()) + + // Filter for error/warning logs and format output + if ["ERROR", "WARN"].contains(logEntry.level) { + let formatted = "[\(logEntry.timestamp)] \(logEntry.level): \(logEntry.message)" + _ = try await output.write(formatted + "\n") + } + } catch { + // Skip malformed JSON lines + continue + } + } + return 0 + } + | ( + .name("head"), + arguments: ["-20"] // Limit to first 20 error/warning entries + ) |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + // This test is for compilation only + #expect(pipeline.stages.count == 3) + } + + @Test func testJSONAggregationPipeline() async throws { + struct SalesRecord: Codable { + let product: String + let amount: Double + let date: String + } + + struct SalesSummary: Codable { + let totalSales: Double + let productCounts: [String: Int] + let averageSale: Double + } + + let pipeline = + pipe( + .name("cat"), + arguments: ["sales_data.jsonl"] // JSON Lines format + ) | { input, output, err in + // Aggregate JSON sales data + var totalSales: Double = 0 + var productCounts: [String: Int] = [:] + var recordCount = 0 + + for try await line in input.lines() { + guard !line.isEmpty else { continue } + + do { + let decoder = JSONDecoder() + let record = try decoder.decode(SalesRecord.self, from: line.data(using: .utf8) ?? Data()) + + totalSales += record.amount + productCounts[record.product, default: 0] += 1 + recordCount += 1 + } catch { + // Log parsing errors but continue + _ = try await err.write("Failed to parse line: \(line)\n") + } + } + + let summary = SalesSummary( + totalSales: totalSales, + productCounts: productCounts, + averageSale: recordCount > 0 ? totalSales / Double(recordCount) : 0 + ) + + do { + let encoder = JSONEncoder() + encoder.outputFormatting = .prettyPrinted + let summaryJson = try encoder.encode(summary) + let jsonString = String(data: summaryJson, encoding: .utf8) ?? "" + + let written = try await output.write(jsonString) + return written > 0 ? 0 : 1 + } catch { + _ = try await err.write("Failed to encode summary: \(error)") + return 1 + } + } |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + // This test is for compilation only + #expect(pipeline.stages.count == 2) + } + + @Test func testJSONValidationPipeline() async throws { + struct Config: Codable { + let version: String + let settings: [String: String] + let enabled: Bool + } + + let pipeline = + pipe( + .name("find"), + arguments: ["/etc/configs", "-name", "*.json"] + ) + | ( + .name("xargs"), + arguments: ["cat"] + ) | { input, output, err in + // Validate JSON configurations + var validConfigs = 0 + var invalidConfigs = 0 + var currentJson = "" + + for try await line in input.lines() { + if line.trimmingCharacters(in: .whitespaces).isEmpty { + // End of JSON object, try to validate + if !currentJson.isEmpty { + do { + let decoder = JSONDecoder() + let config = try decoder.decode(Config.self, from: currentJson.data(using: .utf8) ?? Data()) + + // Additional validation + if !config.version.isEmpty && config.enabled { + validConfigs += 1 + _ = try await output.write("VALID: \(config.version)\n") + } else { + invalidConfigs += 1 + _ = try await err.write("INVALID: Missing version or disabled\n") + } + } catch { + invalidConfigs += 1 + _ = try await err.write("PARSE_ERROR: \(error)\n") + } + currentJson = "" + } + } else { + currentJson += line + "\n" + } + } + + // Process any remaining JSON + if !currentJson.isEmpty { + do { + let decoder = JSONDecoder() + let config = try decoder.decode(Config.self, from: currentJson.data(using: .utf8) ?? Data()) + if !config.version.isEmpty && config.enabled { + validConfigs += 1 + _ = try await output.write("VALID: \(config.version)\n") + } + } catch { + invalidConfigs += 1 + _ = try await err.write("PARSE_ERROR: \(error)\n") + } + } + + // Summary + _ = try await output.write("\nSUMMARY: \(validConfigs) valid, \(invalidConfigs) invalid\n") + return invalidConfigs > 0 ? 1 : 0 + } |> ( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + // This test is for compilation only + #expect(pipeline.stages.count == 3) + } +} + +// MARK: - Compilation Tests (no execution) + +extension PipeConfigurationTests { + + @Test func testCompilationOfVariousPatterns() { + // These tests just verify that various patterns compile correctly + // They don't execute to avoid platform dependencies + + // Basic pattern with error redirection + let _ = pipe( + .name("sh"), + arguments: ["-c", "echo test >&2"], + options: .mergeErrors + ).finally( + output: .string(limit: .max), + error: .string(limit: .max) + ) + + // Pipe pattern + let _ = + pipe(.name("echo")) + | .name("cat") + | .name("wc") + |> .string(limit: .max) + + // Pipe pattern with error redirection + let _ = + pipe(.name("echo")) + | ( + configuration: Configuration(executable: .name("cat")), + options: .mergeErrors + ) + | .name("wc") + |> .string(limit: .max) + + // Complex pipeline pattern with process helper and error redirection + let _ = + pipe( + .name("find"), + arguments: ["/tmp"] + ) | (.name("head"), arguments: ["-10"], options: .mergeErrors) + | .name("sort") + | (.name("tail"), arguments: ["-5"]) |> .string(limit: .max) + + // Configuration-based pattern with error redirection + let config = Configuration(.name("ls")) + let _ = + pipe( + config, + options: .mergeErrors + ) + | .name("wc") + | .name("cat") + |> .string(limit: .max) + + // Swift function patterns (compilation only) + let _ = pipe( + { input, output, error in + // Compilation test - no execution needed + return 0 + } + ).finally( + output: .string(limit: .max) + ) + + let _ = pipe( + { input, output, error in + // Compilation test - no execution needed + return 0 + } + ).finally( + input: .string("test"), + output: .string(limit: .max), + error: .discarded + ) + + // Mixed pipeline with Swift functions (compilation only) + let _ = + pipe( + .name("echo"), + arguments: ["start"] + ) | { input, output, error in + // This is a compilation test - the function body doesn't need to be executable + return 0 + } | { input, output, error in + // This is a compilation test - the function body doesn't need to be executable + return 0 + } | { input, output, error in + return 0 + } |> ( + output: .string(limit: .max), + error: .discarded + ) + + // Swift function with finally helper + let _ = + pipe( + .name("echo") + ) | { input, output, error in + return 0 + } |> ( + output: .string(limit: .max), + error: .discarded + ) + + #expect(Bool(true)) // All patterns compiled successfully + } +}