From 0712fdb9ee97c71a4dc1579c0c4a9683e9942ccc Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Mon, 19 May 2025 11:54:21 -0700 Subject: [PATCH 1/8] Introduce custom initializers to initialize String, Data, and Array from AsyncBufferSequence.Buffer --- Sources/Subprocess/API.swift | 4 +- Sources/Subprocess/Buffer.swift | 125 +++++++++++------- .../Output+Foundation.swift | 11 ++ .../SubprocessTests+Unix.swift | 8 +- .../SubprocessTests+Windows.swift | 6 +- 5 files changed, 99 insertions(+), 55 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index ea458db..ddd4bf4 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -168,7 +168,7 @@ public func run( workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), input: Input = .none, - error: Error, + error: Error = .discarded, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Error.OutputType == Void { @@ -267,7 +267,7 @@ public func run( environment: Environment = .inherit, workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), - error: Error, + error: Error = .discarded, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionResult where Error.OutputType == Void { diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index aae5a09..657bc83 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -18,13 +18,13 @@ extension AsyncBufferSequence { /// A immutable collection of bytes public struct Buffer: Sendable { #if os(Windows) - private var data: [UInt8] + internal var data: [UInt8] internal init(data: [UInt8]) { self.data = data } #else - private var data: DispatchData + internal var data: DispatchData internal init(data: DispatchData) { self.data = data @@ -54,7 +54,6 @@ extension AsyncBufferSequence.Buffer { @available(SubprocessSpan, *) #endif extension AsyncBufferSequence.Buffer { - #if !SubprocessSpan /// Access the raw bytes stored in this buffer /// - Parameter body: A closure with an `UnsafeRawBufferPointer` parameter that /// points to the contiguous storage for the type. If no such storage exists, @@ -64,13 +63,6 @@ extension AsyncBufferSequence.Buffer { /// - Returns: The return value, if any, of the body closure parameter. public func withUnsafeBytes( _ body: (UnsafeRawBufferPointer) throws -> ResultType - ) rethrows -> ResultType { - return try self._withUnsafeBytes(body) - } - #endif // !SubprocessSpan - - internal func _withUnsafeBytes( - _ body: (UnsafeRawBufferPointer) throws -> ResultType ) rethrows -> ResultType { #if os(Windows) return try self.data.withUnsafeBytes(body) @@ -88,44 +80,46 @@ extension AsyncBufferSequence.Buffer { #if SubprocessSpan // Access the storge backing this Buffer public var bytes: RawSpan { - var backing: SpanBacking? - #if os(Windows) - self.data.withUnsafeBufferPointer { - backing = .pointer($0) - } - #else - self.data.enumerateBytes { buffer, byteIndex, stop in - if _fastPath(backing == nil) { - // In practice, almost all `DispatchData` is contiguous - backing = .pointer(buffer) - } else { - // This DispatchData is not contiguous. We need to copy - // the bytes out - let contents = Array(buffer) - switch backing! { - case .pointer(let ptr): - // Convert the ptr to array - let existing = Array(ptr) - backing = .array(existing + contents) - case .array(let array): - backing = .array(array + contents) + @lifetime(borrow self) + borrowing get { + var backing: SpanBacking? + #if os(Windows) + self.data.withUnsafeBufferPointer { + backing = .pointer($0) + } + #else + self.data.enumerateBytes { buffer, byteIndex, stop in + if _fastPath(backing == nil) { + // In practice, almost all `DispatchData` is contiguous + backing = .pointer(buffer) + } else { + // This DispatchData is not contiguous. We need to copy + // the bytes out + let contents = Array(buffer) + switch backing! { + case .pointer(let ptr): + // Convert the ptr to array + let existing = Array(ptr) + backing = .array(existing + contents) + case .array(let array): + backing = .array(array + contents) + } } } - } - #endif - guard let backing = backing else { - let empty = UnsafeRawBufferPointer(start: nil, count: 0) - let span = RawSpan(_unsafeBytes: empty) - return _overrideLifetime(of: span, to: self) - } - switch backing { - case .pointer(let ptr): - let span = RawSpan(_unsafeElements: ptr) - return _overrideLifetime(of: span, to: self) - case .array(let array): - let ptr = array.withUnsafeBytes { $0 } - let span = RawSpan(_unsafeBytes: ptr) - return _overrideLifetime(of: span, to: self) + #endif + guard let backing = backing else { + let empty = UnsafeRawBufferPointer(start: nil, count: 0) + let span = RawSpan(_unsafeBytes: empty) + return _overrideLifetime(of: span, to: self) + } + switch backing { + case .pointer(let ptr): + let span = RawSpan(_unsafeElements: ptr) + return _overrideLifetime(of: span, to: self) + case .array(let array): + let span = array.bytes + return _overrideLifetime(of: span, to: self) + } } } #endif // SubprocessSpan @@ -159,3 +153,42 @@ extension AsyncBufferSequence.Buffer: Equatable, Hashable { } #endif } + +// MARK: - Initializers +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif +extension String { + /// Create a String with the given encoding from `Buffer`. + /// - Parameters: + /// - buffer: the buffer to copy from + /// - encoding: the encoding to encode Self with + public init?(buffer: AsyncBufferSequence.Buffer, as encoding: Encoding.Type) { +#if os(Windows) + let source = buffer.data.map { Encoding.CodeUnit($0) } + self = String(decoding: source, as: encoding) +#else + self = buffer.withUnsafeBytes { ptr in + return String( + decoding: ptr.bindMemory(to: Encoding.CodeUnit.self).lazy.map { $0 }, + as: encoding + ) + } +#endif + } +} + +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif +extension Array where Element == UInt8 { + /// Create an Array from `Buffer` + /// - Parameter buffer: the buffer to copy from + public init(buffer: AsyncBufferSequence.Buffer) { +#if os(Windows) + self = buffer.data +#else + self = Array(buffer.data) +#endif + } +} diff --git a/Sources/Subprocess/SubprocessFoundation/Output+Foundation.swift b/Sources/Subprocess/SubprocessFoundation/Output+Foundation.swift index b6d4bd5..29129b3 100644 --- a/Sources/Subprocess/SubprocessFoundation/Output+Foundation.swift +++ b/Sources/Subprocess/SubprocessFoundation/Output+Foundation.swift @@ -61,6 +61,17 @@ extension OutputProtocol where Self == DataOutput { } } +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif +extension Data { + /// Create a `Data` from `Buffer` + /// - Parameter buffer: buffer to copy from + public init(buffer: AsyncBufferSequence.Buffer) { + self = Data(buffer.data) + } +} + // MARK: - Workarounds #if SubprocessSpan @available(SubprocessSpan, *) diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index a66ac8d..0450617 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -433,7 +433,7 @@ extension SubprocessUnixTests { ) { execution, standardOutput in var buffer = Data() for try await chunk in standardOutput { - let currentChunk = chunk._withUnsafeBytes { Data($0) } + let currentChunk = chunk.withUnsafeBytes { Data($0) } buffer += currentChunk } return buffer @@ -472,7 +472,7 @@ extension SubprocessUnixTests { ) { execution, standardOutput in var buffer = Data() for try await chunk in standardOutput { - let currentChunk = chunk._withUnsafeBytes { Data($0) } + let currentChunk = chunk.withUnsafeBytes { Data($0) } buffer += currentChunk } return buffer @@ -620,7 +620,7 @@ extension SubprocessUnixTests { ) { execution, standardOutput in var buffer = Data() for try await chunk in standardOutput { - let currentChunk = chunk._withUnsafeBytes { Data($0) } + let currentChunk = chunk.withUnsafeBytes { Data($0) } buffer += currentChunk } return buffer @@ -828,7 +828,7 @@ extension SubprocessUnixTests { group.addTask { var outputs: [String] = [] for try await bit in standardOutput { - let bitString = bit._withUnsafeBytes { ptr in + let bitString = bit.withUnsafeBytes { ptr in return String(decoding: ptr, as: UTF8.self) }.trimmingCharacters(in: .whitespacesAndNewlines) if bitString.contains("\n") { diff --git a/Tests/SubprocessTests/SubprocessTests+Windows.swift b/Tests/SubprocessTests/SubprocessTests+Windows.swift index 0def034..2226cc2 100644 --- a/Tests/SubprocessTests/SubprocessTests+Windows.swift +++ b/Tests/SubprocessTests/SubprocessTests+Windows.swift @@ -367,7 +367,7 @@ extension SubprocessWindowsTests { ) { execution, standardOutput in var buffer = Data() for try await chunk in standardOutput { - let currentChunk = chunk._withUnsafeBytes { Data($0) } + let currentChunk = chunk.withUnsafeBytes { Data($0) } buffer += currentChunk } return buffer @@ -407,7 +407,7 @@ extension SubprocessWindowsTests { ) { execution, standardOutput in var buffer = Data() for try await chunk in standardOutput { - let currentChunk = chunk._withUnsafeBytes { Data($0) } + let currentChunk = chunk.withUnsafeBytes { Data($0) } buffer += currentChunk } return buffer @@ -504,7 +504,7 @@ extension SubprocessWindowsTests { ) { subprocess, standardOutput in var buffer = Data() for try await chunk in standardOutput { - let currentChunk = chunk._withUnsafeBytes { Data($0) } + let currentChunk = chunk.withUnsafeBytes { Data($0) } buffer += currentChunk } return buffer From a7742717b9b5c52b52fa3342ba1bfe5d65b8d27f Mon Sep 17 00:00:00 2001 From: Guillaume Lessard Date: Mon, 19 May 2025 18:13:34 -0700 Subject: [PATCH 2/8] Update Sources/Subprocess/Buffer.swift --- Sources/Subprocess/Buffer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index 657bc83..135d856 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -117,7 +117,7 @@ extension AsyncBufferSequence.Buffer { let span = RawSpan(_unsafeElements: ptr) return _overrideLifetime(of: span, to: self) case .array(let array): - let span = array.bytes + let span = array.span.bytes return _overrideLifetime(of: span, to: self) } } From c3e85bbc9d4acf64c1f3c313842976e2aafdf4ec Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Thu, 29 May 2025 15:31:49 -0700 Subject: [PATCH 3/8] Implement LineSequence --- Sources/Subprocess/AsyncBufferSequence.swift | 236 ++++++++++++++++++ .../SubprocessTests+Unix.swift | 118 +++++++++ 2 files changed, 354 insertions(+) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 442e07e..e884df1 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -74,6 +74,242 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public func makeAsyncIterator() -> Iterator { return Iterator(diskIO: self.diskIO) } + + public func lines( + encoding: Encoding.Type = UTF8.self, + bufferingPolicy: LineSequence.BufferingPolicy = .unbounded + ) -> LineSequence { + return LineSequence(underlying: self, encoding: encoding, bufferingPolicy: bufferingPolicy) + } +} + +// MARK: - LineSequence +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif +extension AsyncBufferSequence { + public struct LineSequence: AsyncSequence, Sendable { + public typealias Element = String + + private let base: AsyncBufferSequence + private let bufferingPolicy: BufferingPolicy + + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = String + + private var source: AsyncBufferSequence.AsyncIterator + private var buffer: [Encoding.CodeUnit] + private var eofReached: Bool + private var startIndex: Int + private let bufferingPolicy: BufferingPolicy + + internal init( + underlyingIterator: AsyncBufferSequence.AsyncIterator, + bufferingPolicy: BufferingPolicy + ) { + self.source = underlyingIterator + self.buffer = [] + self.eofReached = false + self.startIndex = 0 + self.bufferingPolicy = bufferingPolicy + } + + public mutating func next() async throws -> String? { + + func loadBuffer() async throws -> [Encoding.CodeUnit]? { + guard !self.eofReached else { + return nil + } + + guard let buffer = try await self.source.next() else { + self.eofReached = true + return nil + } + #if os(Windows) + // Cast data to CodeUnit type + let result = buffer.withUnsafeBytes { ptr in + return Array( + UnsafeBufferPointer( + start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!, + count: ptr.count / MemoryLayout.size + ) + ) + } + #else + // Unfortunitely here we _have to_ copy the bytes out because + // DisptachIO (rightfully) reuses buffer, which means `buffer.data` + // has the same address on all iterations, therefore we can't directly + // create the result array from buffer.data + let temporary = UnsafeMutableBufferPointer.allocate( + capacity: buffer.data.count + ) + defer { temporary.deallocate() } + let actualBytesCopied = buffer.data.copyBytes(to: temporary) + + // Calculate how many CodePoint elements we have + let elementCount = actualBytesCopied / MemoryLayout.stride + + // Create array by copying from the buffer reinterpreted as CodePoint + let result: Array = Array( + UnsafeBufferPointer(start: temporary.baseAddress, count: elementCount) + ) + #endif + return result.isEmpty ? nil : result + } + + func yield(at endIndex: Int) -> String? { + defer { + self.buffer.removeFirst(endIndex) + self.startIndex = 0 + } + if self.buffer.isEmpty { + return nil + } + return String(decoding: self.buffer[0 ..< endIndex], as: Encoding.self) + } + + // https://en.wikipedia.org/wiki/Newline#Unicode + let lineFeed = Encoding.CodeUnit(0x0A) + /// let verticalTab = Encoding.CodeUnit(0x0B) + /// let formFeed = Encoding.CodeUnit(0x0C) + let carriageReturn = Encoding.CodeUnit(0x0D) + // carriageReturn + lineFeed + let newLine: Encoding.CodeUnit + let lineSeparator: Encoding.CodeUnit + let paragraphSeparator: Encoding.CodeUnit + switch Encoding.CodeUnit.self { + case is UInt8.Type: + newLine = Encoding.CodeUnit(0xC2) // 0xC2 0x85 + lineSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA8 + paragraphSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA9 + case is UInt16.Type, is UInt32.Type: + newLine = Encoding.CodeUnit(0x0085) + lineSeparator = Encoding.CodeUnit(0x2028) + paragraphSeparator = Encoding.CodeUnit(0x2029) + default: + fatalError("Unknown encoding type \(Encoding.self)") + } + + while true { + // Step 1: Load more buffer if needed + if self.startIndex >= self.buffer.count { + guard let nextBuffer = try await loadBuffer() else { + // We have no more data + // Return the remaining data + return yield(at: self.buffer.count) + } + self.buffer += nextBuffer + } + // Step 2: Iterate through buffer to find next line + var currentIndex: Int = self.startIndex + for index in self.startIndex ..< self.buffer.count { + currentIndex = index + // Early return if we exceed max line length + if case .maxLineLength(let maxLength) = self.bufferingPolicy, + currentIndex >= maxLength { + return yield(at: currentIndex) + } + let byte = self.buffer[currentIndex] + switch byte { + case carriageReturn: + // Swallow any subsequent lineFeed if there is one + var targetIndex = currentIndex + if (currentIndex + 1) < self.buffer.count, self.buffer[currentIndex + 1] == lineFeed { + targetIndex = currentIndex + 1 + } + guard let result = yield(at: targetIndex + 1) else { + continue + } + return result + case lineFeed ..< carriageReturn: + guard let result = yield(at: currentIndex + 1) else { + continue + } + return result + case newLine: + var targetIndex = currentIndex + if Encoding.CodeUnit.self is UInt8.Type { + // For UTF8, look for the next 0x85 byte + guard (targetIndex + 1) < self.buffer.count, + self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x85) else { + // Not a valid new ine. Keep looking + continue + } + // Swallow 0x85 byte + targetIndex += 1 + } + guard let result = yield(at: targetIndex + 1) else { + continue + } + return result + case lineSeparator, paragraphSeparator: + var targetIndex = currentIndex + if Encoding.CodeUnit.self is UInt8.Type { + // For UTF8, look for the next 0x80 byte + guard (targetIndex + 1) < self.buffer.count, + self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x80) else { + // Not a valid new ine. Keep looking + continue + } + // Swallow 0x80 byte + targetIndex += 1 + // Look for the final 0xA8 (lineSeparator) or 0xA9 (paragraphSeparator) + guard (targetIndex + 1) < self.buffer.count, + (self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA8) || + self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA9)) else { + // Not a valid new ine. Keep looking + continue + } + // Swallow 0xA8 (or 0xA9) byte + targetIndex += 1 + } + guard let result = yield(at: targetIndex + 1) else { + continue + } + return result + default: + // Keep searching + continue + } + } + // There is no new line in the buffer. Load more buffer and try again + self.startIndex = currentIndex + 1 + } + } + } + + public func makeAsyncIterator() -> AsyncIterator { + return AsyncIterator( + underlyingIterator: self.base.makeAsyncIterator(), + bufferingPolicy: self.bufferingPolicy + ) + } + + internal init( + underlying: AsyncBufferSequence, + encoding: Encoding.Type, + bufferingPolicy: BufferingPolicy + ) { + self.base = underlying + self.bufferingPolicy = bufferingPolicy + } + } +} + +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif +extension AsyncBufferSequence.LineSequence { + public enum BufferingPolicy: Sendable { + /// Continue to add to the buffer, without imposing a limit + /// on the number of buffered elements (line length). + case unbounded + /// Impose a max buffer size (line length) limit. + /// When using this policy, `LineSequence` will return a line if: + /// - A newline character is encountered (standard behavior) + /// - The current line in the buffer reaches or exceeds the specified maximum length + case maxLineLength(Int) + } } // MARK: - Page Size diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 0450617..c0e648a 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -933,6 +933,124 @@ extension SubprocessUnixTests { } #expect(result == .unhandledException(SIGKILL)) } + + @Test func testLineSequence() async throws { + guard #available(SubprocessSpan , *) else { + return + } + + typealias TestCase = (value: String, count: Int, newLine: String) + enum TestCaseSize: CaseIterable { + case large // (1.0 ~ 2.0) * buffer size + case medium // (0.2 ~ 1.0) * buffer size + case small // Less than 16 characters + } + + let newLineCharacters: [[UInt8]] = [ + [0x0A], // Line feed + [0x0B], // Vertical tab + [0x0C], // Form feed + [0x0D], // Carriage return + [0x0D, 0x0A], // Carriage return + Line feed + [0xC2, 0x85], // New line + [0xE2, 0x80, 0xA8], // Line Separator + [0xE2, 0x80, 0xA9] // Paragraph separator + ] + + // Geneate test cases + func generateString(size: TestCaseSize) -> [UInt8] { + // Basic Latin has the range U+0020 ... U+007E + let range: ClosedRange = 0x20 ... 0x7E // 0x4E ... 0x5A + + let length: Int + switch size { + case .large: + length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize)) + case .medium: + length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize)) + case .small: + length = Int.random(in: 0 ..< 16) + } + + var buffer: [UInt8] = Array(repeating: 0, count: length) + for index in 0 ..< length { + buffer[index] = UInt8.random(in: range) + } + return buffer + } + + // Generate at least 2 long lines that is longer than buffer size + func generateTestCases(count: Int) -> [TestCase] { + var targetSizes: [TestCaseSize] = TestCaseSize.allCases.flatMap { + Array(repeating: $0, count: count / 3) + } + // Fill the remainder + let remaining = count - targetSizes.count + let rest = TestCaseSize.allCases.shuffled().prefix(remaining) + targetSizes.append(contentsOf: rest) + // Do a final shuffle to achiave random order + targetSizes.shuffle() + // Now generate test cases based on sizes + var testCases: [TestCase] = [] + for size in targetSizes { + let components = generateString(size: size) + // Choose a random new line + let newLine = newLineCharacters.randomElement()! + let string = String(decoding: components + newLine, as: UTF8.self) + testCases.append(( + value: string, + count: components.count + newLine.count, + newLine: String(decoding: newLine, as: UTF8.self) + )) + } + return testCases + } + + func writeTestCasesToFile(_ testCases: [TestCase], at url: URL) throws { + #if canImport(Darwin) + FileManager.default.createFile(atPath: url.path(), contents: nil, attributes: nil) + let fileHadle = try FileHandle(forWritingTo: url) + for testCase in testCases { + fileHadle.write(testCase.value.data(using: .utf8)!) + } + try fileHadle.close() + #else + var result = "" + for testCase in testCases { + result += testCase.value + } + try result.write(to: url, atomically: true, encoding: .utf8) + #endif + } + + let testCaseCount = 60 + let testFilePath = URL.temporaryDirectory.appending(path: "NewLines.txt") + if FileManager.default.fileExists(atPath: testFilePath.path()) { + try FileManager.default.removeItem(at: testFilePath) + } + let testCases = generateTestCases(count: testCaseCount) + try writeTestCasesToFile(testCases, at: testFilePath) + + _ = try await Subprocess.run( + .path("/bin/cat"), + arguments: [testFilePath.path()], + error: .discarded + ) { execution, standardOutput in + var index = 0 + for try await line in standardOutput.lines(encoding: UTF8.self) { + #expect( + line == testCases[index].value, + """ + Found mistachig line at index \(index) + Expected: [\(testCases[index].value)] + Actual: [\(line)] + Line Ending \(Array(testCases[index].newLine.utf8)) + """ + ) + index += 1 + } + } + } } // MARK: - Utils From b864ce475f99bb5d467ec05dc5ec68fd18796346 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Fri, 30 May 2025 11:05:49 -0700 Subject: [PATCH 4/8] Update Buffer to be based on DispatchData.Slice --- Sources/Subprocess/AsyncBufferSequence.swift | 32 ++-- Sources/Subprocess/Buffer.swift | 158 +++++++----------- Sources/Subprocess/IO/Output.swift | 73 +++----- .../Platforms/Subprocess+Unix.swift | 56 +------ .../Platforms/Subprocess+Windows.swift | 4 +- .../SubprocessTests+Unix.swift | 35 +++- 6 files changed, 150 insertions(+), 208 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index e884df1..eb42651 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -37,22 +37,23 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public typealias Element = Buffer private let diskIO: DiskIO - private var buffer: [UInt8] - private var currentPosition: Int - private var finished: Bool + private var buffer: [Buffer] internal init(diskIO: DiskIO) { self.diskIO = diskIO self.buffer = [] - self.currentPosition = 0 - self.finished = false } - public func next() async throws -> Buffer? { - let data = try await self.diskIO.readChunk( + public mutating func next() async throws -> Buffer? { + // If we have more left in buffer, use that + guard self.buffer.isEmpty else { + return self.buffer.removeFirst() + } + // Read more data + let data = try await self.diskIO.read( upToLength: readBufferSize ) - if data == nil { + guard let data = data else { // We finished reading. Close the file descriptor now #if os(Windows) try self.diskIO.close() @@ -61,7 +62,15 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { #endif return nil } - return data + let createdBuffers = Buffer.createFrom(data) + // Most (all?) cases there should be only one buffer + // because DispatchData are motsly contiguous + if _fastPath(createdBuffers.count == 1) { + // No need to push to the stack + return createdBuffers[0] + } + self.buffer = createdBuffers + return self.buffer.removeFirst() } } @@ -144,7 +153,10 @@ extension AsyncBufferSequence { capacity: buffer.data.count ) defer { temporary.deallocate() } - let actualBytesCopied = buffer.data.copyBytes(to: temporary) + let actualBytesCopied = buffer.data.copyBytes( + to: temporary, + count: buffer.data.count + ) // Calculate how many CodePoint elements we have let elementCount = actualBytesCopied / MemoryLayout.stride diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index 135d856..432b90b 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -9,7 +9,9 @@ // //===----------------------------------------------------------------------===// +#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) @preconcurrency internal import Dispatch +#endif #if SubprocessSpan @available(SubprocessSpan, *) @@ -18,16 +20,32 @@ extension AsyncBufferSequence { /// A immutable collection of bytes public struct Buffer: Sendable { #if os(Windows) - internal var data: [UInt8] + internal let data: [UInt8] internal init(data: [UInt8]) { self.data = data } + + internal static func createFrom(_ data: [UInt8]) -> [Buffer] { + return [.init(data: data)] + } #else - internal var data: DispatchData + // We need to keep the backingData alive while Slice is alive + internal let backingData: DispatchData + internal let data: DispatchData.Slice - internal init(data: DispatchData) { + internal init(data: DispatchData.Slice, backingData: DispatchData) { self.data = data + self.backingData = backingData + } + + internal static func createFrom(_ data: DispatchData) -> [Buffer] { + let slices = data.slices + // In most (all?) cases data should only have one slice + if _fastPath(slices.count == 1) { + return [.init(data: slices[0], backingData: data)] + } + return slices.map{ .init(data: $0, backingData: data) } } #endif } @@ -64,17 +82,7 @@ extension AsyncBufferSequence.Buffer { public func withUnsafeBytes( _ body: (UnsafeRawBufferPointer) throws -> ResultType ) rethrows -> ResultType { - #if os(Windows) return try self.data.withUnsafeBytes(body) - #else - // Although DispatchData was designed to be uncontiguous, in practice - // we found that almost all DispatchData are contiguous. Therefore - // we can access this body in O(1) most of the time. - return try self.data.withUnsafeBytes { ptr in - let bytes = UnsafeRawBufferPointer(start: ptr, count: self.data.count) - return try body(bytes) - } - #endif } #if SubprocessSpan @@ -82,52 +90,12 @@ extension AsyncBufferSequence.Buffer { public var bytes: RawSpan { @lifetime(borrow self) borrowing get { - var backing: SpanBacking? - #if os(Windows) - self.data.withUnsafeBufferPointer { - backing = .pointer($0) - } - #else - self.data.enumerateBytes { buffer, byteIndex, stop in - if _fastPath(backing == nil) { - // In practice, almost all `DispatchData` is contiguous - backing = .pointer(buffer) - } else { - // This DispatchData is not contiguous. We need to copy - // the bytes out - let contents = Array(buffer) - switch backing! { - case .pointer(let ptr): - // Convert the ptr to array - let existing = Array(ptr) - backing = .array(existing + contents) - case .array(let array): - backing = .array(array + contents) - } - } - } - #endif - guard let backing = backing else { - let empty = UnsafeRawBufferPointer(start: nil, count: 0) - let span = RawSpan(_unsafeBytes: empty) - return _overrideLifetime(of: span, to: self) - } - switch backing { - case .pointer(let ptr): - let span = RawSpan(_unsafeElements: ptr) - return _overrideLifetime(of: span, to: self) - case .array(let array): - let span = array.span.bytes - return _overrideLifetime(of: span, to: self) - } + let ptr = self.data.withUnsafeBytes { $0 } + let bytes = RawSpan(_unsafeBytes: ptr) + return _overrideLifetime(of: bytes, to: self) } } #endif // SubprocessSpan - - private enum SpanBacking { - case pointer(UnsafeBufferPointer) - case array([UInt8]) - } } // MARK: - Hashable, Equatable @@ -144,51 +112,53 @@ extension AsyncBufferSequence.Buffer: Equatable, Hashable { public func hash(into hasher: inout Hasher) { self.data.withUnsafeBytes { ptr in - let bytes = UnsafeRawBufferPointer( - start: ptr, - count: self.data.count - ) - hasher.combine(bytes: bytes) + hasher.combine(bytes: ptr) } } #endif } -// MARK: - Initializers -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif -extension String { - /// Create a String with the given encoding from `Buffer`. - /// - Parameters: - /// - buffer: the buffer to copy from - /// - encoding: the encoding to encode Self with - public init?(buffer: AsyncBufferSequence.Buffer, as encoding: Encoding.Type) { -#if os(Windows) - let source = buffer.data.map { Encoding.CodeUnit($0) } - self = String(decoding: source, as: encoding) -#else - self = buffer.withUnsafeBytes { ptr in - return String( - decoding: ptr.bindMemory(to: Encoding.CodeUnit.self).lazy.map { $0 }, - as: encoding - ) +// MARK: - DispatchData.Block +#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) +extension DispatchData { + /// Unfortunitely `DispatchData.Region` is not available on Linux, hence our own wrapper + internal struct Slice: @unchecked Sendable, RandomAccessCollection { + typealias Element = UInt8 + + internal let bytes: UnsafeBufferPointer + + internal var startIndex: Int { self.bytes.startIndex } + internal var endIndex: Int { self.bytes.endIndex } + + internal init(bytes: UnsafeBufferPointer) { + self.bytes = bytes + } + + internal func withUnsafeBytes(_ body: (UnsafeRawBufferPointer) throws -> ResultType) rethrows -> ResultType { + return try body(UnsafeRawBufferPointer(self.bytes)) + } + + @discardableResult + internal func copyBytes( + to ptr: UnsafeMutableBufferPointer, count: Int + ) -> Int { + self.bytes.copyBytes(to: ptr, count: count) + } + + subscript(position: Int) -> UInt8 { + _read { + yield self.bytes[position] + } } -#endif } -} -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif -extension Array where Element == UInt8 { - /// Create an Array from `Buffer` - /// - Parameter buffer: the buffer to copy from - public init(buffer: AsyncBufferSequence.Buffer) { -#if os(Windows) - self = buffer.data -#else - self = Array(buffer.data) -#endif + internal var slices: [Slice] { + var slices = [Slice]() + enumerateBytes { (bytes, index, stop) in + slices.append(Slice(bytes: bytes)) + } + return slices } } + +#endif diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index dae7ede..395713c 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -161,29 +161,17 @@ public struct BytesOutput: OutputProtocol { internal func captureOutput( from diskIO: consuming TrackedPlatformDiskIO? ) async throws -> [UInt8] { - var diskIOBox: TrackedPlatformDiskIO? = consume diskIO - return try await withCheckedThrowingContinuation { continuation in - let _diskIO = diskIOBox.take() - guard let _diskIO = _diskIO else { - // Show not happen due to type system constraints - fatalError("Trying to capture output without file descriptor") - } - _diskIO.readUntilEOF(upToLength: self.maxSize) { result in - switch result { - case .success(let data): - // FIXME: remove workaround for - // rdar://143992296 - // https://github.com/swiftlang/swift-subprocess/issues/3 - #if os(Windows) - continuation.resume(returning: data) - #else - continuation.resume(returning: data.array()) - #endif - case .failure(let error): - continuation.resume(throwing: error) - } - } + guard let diskIO = diskIO else { + // Show not happen due to type system constraints + fatalError("Trying to capture output without file descriptor") } + #if os(Windows) + let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize) + #else + let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize) + #endif + + return result?.array() ?? [] } #if SubprocessSpan @@ -315,34 +303,23 @@ extension OutputProtocol { if let bytesOutput = self as? BytesOutput { return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType } - var diskIOBox: TrackedPlatformDiskIO? = consume diskIO - return try await withCheckedThrowingContinuation { continuation in - if OutputType.self == Void.self { - continuation.resume(returning: () as! OutputType) - return - } - guard let _diskIO = diskIOBox.take() else { - // Show not happen due to type system constraints - fatalError("Trying to capture output without file descriptor") - } - _diskIO.readUntilEOF(upToLength: self.maxSize) { result in - do { - switch result { - case .success(let data): - // FIXME: remove workaround for - // rdar://143992296 - // https://github.com/swiftlang/swift-subprocess/issues/3 - let output = try self.output(from: data) - continuation.resume(returning: output) - case .failure(let error): - continuation.resume(throwing: error) - } - } catch { - continuation.resume(throwing: error) - } - } + if OutputType.self == Void.self { + return () as! OutputType + } + + guard let diskIO = diskIO else { + // Show not happen due to type system constraints + fatalError("Trying to capture output without file descriptor") } + + #if os(Windows) + let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize) + #else + let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize) + #endif + + return try self.output(from: result ?? .empty) } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index d61df5a..47a31c1 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -29,7 +29,7 @@ import Glibc import Musl #endif -internal import Dispatch +@preconcurrency internal import Dispatch // MARK: - Signals @@ -414,7 +414,7 @@ extension DispatchIO { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { + internal func read(upToLength maxLength: Int) async throws -> DispatchData? { return try await withCheckedThrowingContinuation { continuation in var buffer: DispatchData = .empty self.read( @@ -440,7 +440,7 @@ extension DispatchIO { } if done { if !buffer.isEmpty { - continuation.resume(returning: AsyncBufferSequence.Buffer(data: buffer)) + continuation.resume(returning: buffer) } else { continuation.resume(returning: nil) } @@ -451,56 +451,6 @@ extension DispatchIO { } extension TrackedDispatchIO { - #if SubprocessSpan - @available(SubprocessSpan, *) - #endif - internal consuming func readUntilEOF( - upToLength maxLength: Int, - resultHandler: sending @escaping (Swift.Result) -> Void - ) { - var buffer: DispatchData? - self.dispatchIO.read( - offset: 0, - length: maxLength, - queue: .global() - ) { done, data, error in - guard error == 0, let chunkData = data else { - self.dispatchIO.close() - resultHandler( - .failure( - SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: error) - ) - ) - ) - return - } - // Close dispatchIO if we are done - if done { - self.dispatchIO.close() - } - // Easy case: if we are done and buffer is nil, this means - // there is only one chunk of data - if done && buffer == nil { - buffer = chunkData - resultHandler(.success(chunkData)) - return - } - - if buffer == nil { - buffer = chunkData - } else { - buffer?.append(chunkData) - } - - if done { - resultHandler(.success(buffer!)) - return - } - } - } - #if SubprocessSpan @available(SubprocessSpan, *) internal func write( diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 2e80677..d81889a 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1071,7 +1071,7 @@ extension FileDescriptor { } extension FileDescriptor { - internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { + internal func read(upToLength maxLength: Int) async throws -> [UInt8]? { return try await withCheckedThrowingContinuation { continuation in self.readUntilEOF( upToLength: maxLength @@ -1080,7 +1080,7 @@ extension FileDescriptor { case .failure(let error): continuation.resume(throwing: error) case .success(let bytes): - continuation.resume(returning: AsyncBufferSequence.Buffer(data: bytes)) + continuation.resume(returning: bytes) } } } diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index c0e648a..6f0603f 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -605,7 +605,7 @@ extension SubprocessUnixTests { } } - @Test func testRedirectedOutputRedirectToSequence() async throws { + @Test func testRedirectedOutputWithUnsafeBytes() async throws { guard #available(SubprocessSpan , *) else { return } @@ -629,6 +629,30 @@ extension SubprocessUnixTests { #expect(catResult.value == expected) } + @Test func testRedirectedOutputBytes() async throws { + guard #available(SubprocessSpan , *) else { + return + } + + // Make ure we can read long text redirected to AsyncSequence + let expected: Data = try Data( + contentsOf: URL(filePath: theMysteriousIsland.string) + ) + let catResult = try await Subprocess.run( + .path("/bin/cat"), + arguments: [theMysteriousIsland.string], + error: .discarded + ) { execution, standardOutput in + var buffer: Data = Data() + for try await chunk in standardOutput { + buffer += Data(bytes: chunk.bytes) + } + return buffer + } + #expect(catResult.terminationStatus.isSuccess) + #expect(catResult.value == expected) + } + @Test func testBufferOutput() async throws { guard #available(SubprocessSpan , *) else { return @@ -664,6 +688,15 @@ extension SubprocessUnixTests { } } +extension Data { + init(bytes: borrowing RawSpan) { + let data = bytes.withUnsafeBytes { + return Data(bytes: $0.baseAddress!, count: $0.count) + } + self = data + } +} + // MARK: - PlatformOption Tests extension SubprocessUnixTests { // Run this test with sudo From d37e9f2a77b9a200efcc1c89cfa197863c958e5f Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Fri, 30 May 2025 11:57:46 -0700 Subject: [PATCH 5/8] Remove unnecessary availability check around AsyncBufferSequence and closure based run() --- Sources/Subprocess/API.swift | 24 ----------------- Sources/Subprocess/AsyncBufferSequence.swift | 9 ------- Sources/Subprocess/Buffer.swift | 16 +++-------- Sources/Subprocess/IO/Output.swift | 9 +++---- .../SubprocessTests+Unix.swift | 27 +++++-------------- 5 files changed, 15 insertions(+), 70 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index ddd4bf4..78b3e43 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -158,9 +158,6 @@ public func run< /// - body: The custom execution body to manually control the running process /// - Returns a ExecutableResult type containing the return value /// of the closure. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func run( _ executable: Executable, arguments: Arguments = [], @@ -208,9 +205,6 @@ public func run( } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func run( _ executable: Executable, arguments: Arguments = [], @@ -258,9 +252,6 @@ public func run( } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func run( _ executable: Executable, arguments: Arguments = [], @@ -291,9 +282,6 @@ public func run( } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func run( _ executable: Executable, arguments: Arguments = [], @@ -340,9 +328,6 @@ public func run( /// - body: The custom execution body to manually control the running process /// - Returns a ExecutableResult type containing the return value /// of the closure. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func run( _ executable: Executable, arguments: Arguments = [], @@ -486,9 +471,6 @@ public func run< /// the running process and write to its standard input. /// - Returns a ExecutableResult type containing the return value /// of the closure. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func run( _ configuration: Configuration, isolation: isolated (any Actor)? = #isolation, @@ -528,9 +510,6 @@ public func run( /// - output: A file descriptor to bind to the subprocess' standard output. /// - error: A file descriptor to bind to the subprocess' standard error. /// - Returns: the process identifier for the subprocess. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func runDetached( _ executable: Executable, arguments: Arguments = [], @@ -564,9 +543,6 @@ public func runDetached( /// - output: A file descriptor to bind to the subprocess' standard output. /// - error: A file descriptor to bind to the subprocess' standard error. /// - Returns: the process identifier for the subprocess. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public func runDetached( _ configuration: Configuration, input: FileDescriptor? = nil, diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index eb42651..23d9c21 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -19,9 +19,6 @@ internal import Dispatch #endif -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public struct AsyncBufferSequence: AsyncSequence, Sendable { public typealias Failure = any Swift.Error public typealias Element = Buffer @@ -93,9 +90,6 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { } // MARK: - LineSequence -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension AsyncBufferSequence { public struct LineSequence: AsyncSequence, Sendable { public typealias Element = String @@ -308,9 +302,6 @@ extension AsyncBufferSequence { } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension AsyncBufferSequence.LineSequence { public enum BufferingPolicy: Sendable { /// Continue to add to the buffer, without imposing a limit diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index 432b90b..40c64c7 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -13,9 +13,7 @@ @preconcurrency internal import Dispatch #endif -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif + extension AsyncBufferSequence { /// A immutable collection of bytes public struct Buffer: Sendable { @@ -52,9 +50,6 @@ extension AsyncBufferSequence { } // MARK: - Properties -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension AsyncBufferSequence.Buffer { /// Number of bytes stored in the buffer public var count: Int { @@ -68,9 +63,6 @@ extension AsyncBufferSequence.Buffer { } // MARK: - Accessors -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension AsyncBufferSequence.Buffer { /// Access the raw bytes stored in this buffer /// - Parameter body: A closure with an `UnsafeRawBufferPointer` parameter that @@ -87,6 +79,9 @@ extension AsyncBufferSequence.Buffer { #if SubprocessSpan // Access the storge backing this Buffer + #if SubprocessSpan + @available(SubprocessSpan, *) + #endif public var bytes: RawSpan { @lifetime(borrow self) borrowing get { @@ -99,9 +94,6 @@ extension AsyncBufferSequence.Buffer { } // MARK: - Hashable, Equatable -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension AsyncBufferSequence.Buffer: Equatable, Hashable { #if os(Windows) // Compiler generated conformances diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index 395713c..24c2092 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -166,12 +166,11 @@ public struct BytesOutput: OutputProtocol { fatalError("Trying to capture output without file descriptor") } #if os(Windows) - let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize) + return try await diskIO.fileDescriptor.read(upToLength: self.maxSize) ?? [] #else let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize) - #endif - return result?.array() ?? [] + #endif } #if SubprocessSpan @@ -315,11 +314,11 @@ extension OutputProtocol { #if os(Windows) let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize) + return try self.output(from: result ?? []) #else let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize) - #endif - return try self.output(from: result ?? .empty) + #endif } } diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 6f0603f..fdba78e 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -420,9 +420,6 @@ extension SubprocessUnixTests { } @Test func testInputSequenceCustomExecutionBody() async throws { - guard #available(SubprocessSpan , *) else { - return - } let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) ) @@ -443,9 +440,6 @@ extension SubprocessUnixTests { } @Test func testInputAsyncSequenceCustomExecutionBody() async throws { - guard #available(SubprocessSpan , *) else { - return - } // Maeks ure we can read long text as AsyncSequence let fd: FileDescriptor = try .open(theMysteriousIsland, .readOnly) let expected: Data = try Data( @@ -606,9 +600,6 @@ extension SubprocessUnixTests { } @Test func testRedirectedOutputWithUnsafeBytes() async throws { - guard #available(SubprocessSpan , *) else { - return - } // Make ure we can read long text redirected to AsyncSequence let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) @@ -629,20 +620,16 @@ extension SubprocessUnixTests { #expect(catResult.value == expected) } + #if SubprocessSpan @Test func testRedirectedOutputBytes() async throws { - guard #available(SubprocessSpan , *) else { - return - } - // Make ure we can read long text redirected to AsyncSequence let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) ) let catResult = try await Subprocess.run( .path("/bin/cat"), - arguments: [theMysteriousIsland.string], - error: .discarded - ) { execution, standardOutput in + arguments: [theMysteriousIsland.string] + ) { (execution: Execution, standardOutput: AsyncBufferSequence) -> Data in var buffer: Data = Data() for try await chunk in standardOutput { buffer += Data(bytes: chunk.bytes) @@ -652,6 +639,7 @@ extension SubprocessUnixTests { #expect(catResult.terminationStatus.isSuccess) #expect(catResult.value == expected) } + #endif @Test func testBufferOutput() async throws { guard #available(SubprocessSpan , *) else { @@ -688,6 +676,8 @@ extension SubprocessUnixTests { } } +#if SubprocessSpan +@available(SubprocessSpan, *) extension Data { init(bytes: borrowing RawSpan) { let data = bytes.withUnsafeBytes { @@ -696,6 +686,7 @@ extension Data { self = data } } +#endif // MARK: - PlatformOption Tests extension SubprocessUnixTests { @@ -968,10 +959,6 @@ extension SubprocessUnixTests { } @Test func testLineSequence() async throws { - guard #available(SubprocessSpan , *) else { - return - } - typealias TestCase = (value: String, count: Int, newLine: String) enum TestCaseSize: CaseIterable { case large // (1.0 ~ 2.0) * buffer size From 07953c25ed4e6bb48c755a2caa237daa312c86bb Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Fri, 30 May 2025 12:17:25 -0700 Subject: [PATCH 6/8] Add documentation for the new closure based run() --- Sources/Subprocess/API.swift | 57 ++++++++++++++++--- .../Input+Foundation.swift | 4 +- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 78b3e43..4ef5b4f 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -152,7 +152,6 @@ public func run< /// - platformOptions: The platform specific options to use /// when running the executable. /// - input: The input to send to the executable. -/// - output: How to manage the executable standard ouput. /// - error: How to manager executable standard error. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process @@ -205,6 +204,21 @@ public func run( } } +/// Run a executable with given parameters and a custom closure +/// to manage the running subprocess' lifetime and stream its standard error. +/// - Parameters: +/// - executable: The executable to run. +/// - arguments: The arguments to pass to the executable. +/// - environment: The environment in which to run the executable. +/// - workingDirectory: The working directory in which to run the executable. +/// - platformOptions: The platform specific options to use +/// when running the executable. +/// - input: The input to send to the executable. +/// - output: How to manager executable standard output. +/// - isolation: the isolation context to run the body closure. +/// - body: The custom execution body to manually control the running process +/// - Returns a ExecutableResult type containing the return value +/// of the closure. public func run( _ executable: Executable, arguments: Arguments = [], @@ -252,6 +266,21 @@ public func run( } } +/// Run a executable with given parameters and a custom closure +/// to manage the running subprocess' lifetime, write to its +/// standard input, and stream its standard output. +/// - Parameters: +/// - executable: The executable to run. +/// - arguments: The arguments to pass to the executable. +/// - environment: The environment in which to run the executable. +/// - workingDirectory: The working directory in which to run the executable. +/// - platformOptions: The platform specific options to use +/// when running the executable. +/// - error: How to manager executable standard error. +/// - isolation: the isolation context to run the body closure. +/// - body: The custom execution body to manually control the running process +/// - Returns a ExecutableResult type containing the return value +/// of the closure. public func run( _ executable: Executable, arguments: Arguments = [], @@ -282,6 +311,21 @@ public func run( } } +/// Run a executable with given parameters and a custom closure +/// to manage the running subprocess' lifetime, write to its +/// standard input, and stream its standard error. +/// - Parameters: +/// - executable: The executable to run. +/// - arguments: The arguments to pass to the executable. +/// - environment: The environment in which to run the executable. +/// - workingDirectory: The working directory in which to run the executable. +/// - platformOptions: The platform specific options to use +/// when running the executable. +/// - output: How to manager executable standard output. +/// - isolation: the isolation context to run the body closure. +/// - body: The custom execution body to manually control the running process +/// - Returns a ExecutableResult type containing the return value +/// of the closure. public func run( _ executable: Executable, arguments: Arguments = [], @@ -313,8 +357,8 @@ public func run( } /// Run a executable with given parameters and a custom closure -/// to manage the running subprocess' lifetime and write to its -/// standard input via `StandardInputWriter` +/// to manage the running subprocess' lifetime, write to its +/// standard input, and stream its standard output and standard error. /// - Parameters: /// - executable: The executable to run. /// - arguments: The arguments to pass to the executable. @@ -322,8 +366,6 @@ public func run( /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform specific options to use /// when running the executable. -/// - output:How to handle executable's standard output -/// - error: How to handle executable's standard error /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns a ExecutableResult type containing the return value @@ -464,11 +506,10 @@ public func run< /// Run a executable with given parameters specified by a `Configuration` /// - Parameters: /// - configuration: The `Subprocess` configuration to run. -/// - output: The method to use for redirecting the standard output. -/// - error: The method to use for redirecting the standard error. /// - isolation: the isolation context to run the body closure. /// - body: The custom configuration body to manually control -/// the running process and write to its standard input. +/// the running process, write to its standard input, stream +/// its standard output and standard error. /// - Returns a ExecutableResult type containing the return value /// of the closure. public func run( diff --git a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift index 39a53ce..a1db8ad 100644 --- a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift +++ b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift @@ -115,7 +115,7 @@ extension StandardInputWriter { } /// Write a AsyncSequence of Data to the standard input of the subprocess. - /// - Parameter sequence: The sequence of bytes to write. + /// - Parameter asyncSequence: The sequence of bytes to write. /// - Returns number of bytes written. public func write( _ asyncSequence: AsyncSendableSequence @@ -135,7 +135,7 @@ extension TrackedFileDescriptor { ) async throws -> Int { let fileDescriptor = self.fileDescriptor return try await withCheckedThrowingContinuation { continuation in - // TODO: Figure out a better way to asynchornously write + // TODO: Figure out a better way to asynchronously write DispatchQueue.global(qos: .userInitiated).async { data.withUnsafeBytes { Self.write( From 00bcd9979bfb70ab224b63967e6a6bfd2056ed74 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Tue, 3 Jun 2025 13:30:01 -0700 Subject: [PATCH 7/8] Fix typos througout the repo --- README.md | 32 +++------- Sources/Subprocess/API.swift | 30 +++++----- Sources/Subprocess/AsyncBufferSequence.swift | 12 ++-- Sources/Subprocess/Buffer.swift | 4 +- Sources/Subprocess/Configuration.swift | 18 ++---- Sources/Subprocess/Execution.swift | 5 +- Sources/Subprocess/IO/Input.swift | 4 +- Sources/Subprocess/IO/Output.swift | 59 ++++--------------- .../Platforms/Subprocess+Darwin.swift | 25 ++++---- .../Platforms/Subprocess+Linux.swift | 8 +-- .../Platforms/Subprocess+Unix.swift | 12 +--- .../Platforms/Subprocess+Windows.swift | 46 +++++++-------- Sources/Subprocess/Teardown.swift | 16 ++--- .../SubprocessTests+Linux.swift | 4 +- .../SubprocessTests+Unix.swift | 34 +++++------ .../SubprocessTests+Windows.swift | 28 ++++----- 16 files changed, 130 insertions(+), 207 deletions(-) diff --git a/README.md b/README.md index 4a9eccd..49cb867 100644 --- a/README.md +++ b/README.md @@ -64,27 +64,17 @@ To have more precise control over input and output, you can provide a custom clo ```swift import Subprocess -let result = try await run( - .path("/bin/dd"), - arguments: ["if=/path/to/document"] -) { execution in - var contents = "" - for try await chunk in execution.standardOutput { - let string = chunk.withUnsafeBytes { String(decoding: $0, as: UTF8.self) } - contents += string - if string == "Done" { - // Stop execution - await execution.teardown( - using: [ - .gracefulShutDown( - allowedDurationToNextStep: .seconds(0.5) - ) - ] - ) - return contents +// Monitor Nginx log via `tail -f` +async let monitorResult = run( + .path("/usr/bin/tail"), + arguments: ["-f", "/path/to/nginx.log"] +) { execution, standardOutput in + for try await line in standardOutput.lines(encoding: UTF8.self) { + // Parse the log text + if line.contains("500") { + // Oh no, 500 error } } - return contents } ``` @@ -240,10 +230,6 @@ This option collects output as `[UInt8]`. Use it by setting `.bytes` or `.bytes(limit:)` for `input` or `error`. -#### `SequenceOutput`: - -This option redirects the child output to the `.standardOutput` or `.standardError` property of `Execution`. It’s only for the `run()` family that takes a custom closure. - ### Cross-platform support diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 4ef5b4f..a8ab1ef 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -143,7 +143,7 @@ public func run< // MARK: - Custom Execution Body /// Run an executable with given parameters and a custom closure -/// to manage the running subprocess' lifetime and its IOs. +/// to manage the running subprocess' lifetime and stream its standard output. /// - Parameters: /// - executable: The executable to run. /// - arguments: The arguments to pass to the executable. @@ -155,7 +155,7 @@ public func run< /// - error: How to manager executable standard error. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process -/// - Returns a ExecutableResult type containing the return value +/// - Returns an executableResult type containing the return value /// of the closure. public func run( _ executable: Executable, @@ -204,7 +204,7 @@ public func run( } } -/// Run a executable with given parameters and a custom closure +/// Run an executable with given parameters and a custom closure /// to manage the running subprocess' lifetime and stream its standard error. /// - Parameters: /// - executable: The executable to run. @@ -217,7 +217,7 @@ public func run( /// - output: How to manager executable standard output. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process -/// - Returns a ExecutableResult type containing the return value +/// - Returns an executableResult type containing the return value /// of the closure. public func run( _ executable: Executable, @@ -266,7 +266,7 @@ public func run( } } -/// Run a executable with given parameters and a custom closure +/// Run an executable with given parameters and a custom closure /// to manage the running subprocess' lifetime, write to its /// standard input, and stream its standard output. /// - Parameters: @@ -279,7 +279,7 @@ public func run( /// - error: How to manager executable standard error. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process -/// - Returns a ExecutableResult type containing the return value +/// - Returns an executableResult type containing the return value /// of the closure. public func run( _ executable: Executable, @@ -311,7 +311,7 @@ public func run( } } -/// Run a executable with given parameters and a custom closure +/// Run an executable with given parameters and a custom closure /// to manage the running subprocess' lifetime, write to its /// standard input, and stream its standard error. /// - Parameters: @@ -324,7 +324,7 @@ public func run( /// - output: How to manager executable standard output. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process -/// - Returns a ExecutableResult type containing the return value +/// - Returns an executableResult type containing the return value /// of the closure. public func run( _ executable: Executable, @@ -356,7 +356,7 @@ public func run( } } -/// Run a executable with given parameters and a custom closure +/// Run an executable with given parameters and a custom closure /// to manage the running subprocess' lifetime, write to its /// standard input, and stream its standard output and standard error. /// - Parameters: @@ -368,7 +368,7 @@ public func run( /// when running the executable. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process -/// - Returns a ExecutableResult type containing the return value +/// - Returns an executableResult type containing the return value /// of the closure. public func run( _ executable: Executable, @@ -411,7 +411,7 @@ public func run( // MARK: - Configuration Based -/// Run a `Configuration` asynchrously and returns +/// Run a `Configuration` asynchronously and returns /// a `CollectedResult` containing the output of the child process. /// - Parameters: /// - configuration: The `Subprocess` configuration to run. @@ -503,14 +503,14 @@ public func run< ) } -/// Run a executable with given parameters specified by a `Configuration` +/// Run an executable with given parameters specified by a `Configuration` /// - Parameters: /// - configuration: The `Subprocess` configuration to run. /// - isolation: the isolation context to run the body closure. /// - body: The custom configuration body to manually control /// the running process, write to its standard input, stream /// its standard output and standard error. -/// - Returns a ExecutableResult type containing the return value +/// - Returns an executableResult type containing the return value /// of the closure. public func run( _ configuration: Configuration, @@ -534,7 +534,7 @@ public func run( // MARK: - Detached -/// Run a executable with given parameters and return its process +/// Run an executable with given parameters and return its process /// identifier immediately without monitoring the state of the /// subprocess nor waiting until it exits. /// @@ -571,7 +571,7 @@ public func runDetached( return try runDetached(config, input: input, output: output, error: error) } -/// Run a executable with given configuration and return its process +/// Run an executable with given configuration and return its process /// identifier immediately without monitoring the state of the /// subprocess nor waiting until it exits. /// diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 23d9c21..ed1979e 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -50,7 +50,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { let data = try await self.diskIO.read( upToLength: readBufferSize ) - guard let data = data else { + guard let data else { // We finished reading. Close the file descriptor now #if os(Windows) try self.diskIO.close() @@ -61,7 +61,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { } let createdBuffers = Buffer.createFrom(data) // Most (all?) cases there should be only one buffer - // because DispatchData are motsly contiguous + // because DispatchData are mostly contiguous if _fastPath(createdBuffers.count == 1) { // No need to push to the stack return createdBuffers[0] @@ -139,7 +139,7 @@ extension AsyncBufferSequence { ) } #else - // Unfortunitely here we _have to_ copy the bytes out because + // Unfortunately here we _have to_ copy the bytes out because // DisptachIO (rightfully) reuses buffer, which means `buffer.data` // has the same address on all iterations, therefore we can't directly // create the result array from buffer.data @@ -238,7 +238,7 @@ extension AsyncBufferSequence { // For UTF8, look for the next 0x85 byte guard (targetIndex + 1) < self.buffer.count, self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x85) else { - // Not a valid new ine. Keep looking + // Not a valid new line. Keep looking continue } // Swallow 0x85 byte @@ -254,7 +254,7 @@ extension AsyncBufferSequence { // For UTF8, look for the next 0x80 byte guard (targetIndex + 1) < self.buffer.count, self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x80) else { - // Not a valid new ine. Keep looking + // Not a valid new line. Keep looking continue } // Swallow 0x80 byte @@ -263,7 +263,7 @@ extension AsyncBufferSequence { guard (targetIndex + 1) < self.buffer.count, (self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA8) || self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA9)) else { - // Not a valid new ine. Keep looking + // Not a valid new line. Keep looking continue } // Swallow 0xA8 (or 0xA9) byte diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index 40c64c7..6ed9839 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -78,7 +78,7 @@ extension AsyncBufferSequence.Buffer { } #if SubprocessSpan - // Access the storge backing this Buffer + // Access the storage backing this Buffer #if SubprocessSpan @available(SubprocessSpan, *) #endif @@ -113,7 +113,7 @@ extension AsyncBufferSequence.Buffer: Equatable, Hashable { // MARK: - DispatchData.Block #if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) extension DispatchData { - /// Unfortunitely `DispatchData.Region` is not available on Linux, hence our own wrapper + /// Unfortunately `DispatchData.Region` is not available on Linux, hence our own wrapper internal struct Slice: @unchecked Sendable, RandomAccessCollection { typealias Element = UInt8 diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 9895ea5..af76c46 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -40,7 +40,7 @@ public struct Configuration: Sendable { public var environment: Environment /// The working directory to use when running the executable. public var workingDirectory: FilePath - /// The platform specifc options to use when + /// The platform specific options to use when /// running the subprocess. public var platformOptions: PlatformOptions @@ -58,9 +58,6 @@ public struct Configuration: Sendable { self.platformOptions = platformOptions } - #if SubprocessSpan - @available(SubprocessSpan, *) - #endif internal func run( input: consuming CreatedPipe, output: consuming CreatedPipe, @@ -134,7 +131,7 @@ extension Configuration: CustomStringConvertible, CustomDebugStringConvertible { extension Configuration { /// Close each input individually, and throw the first error if there's multiple errors thrown @Sendable - internal func safelyCloseMultuple( + internal func safelyCloseMultiple( inputRead: consuming TrackedFileDescriptor?, inputWrite: consuming TrackedFileDescriptor?, outputRead: consuming TrackedFileDescriptor?, @@ -393,7 +390,7 @@ extension Environment: CustomStringConvertible, CustomDebugStringConvertible { """ case .inherit(let updateValue): return """ - Inherting current environment with updates: + Inheriting current environment with updates: \(updateValue) """ #if !os(Windows) @@ -454,7 +451,7 @@ public enum TerminationStatus: Sendable, Hashable, Codable { /// The subprocess was existed with the given code case exited(Code) - /// The subprocess was signalled with given exception value + /// The subprocess was signaled with given exception value case unhandledException(Code) /// Whether the current TerminationStatus is successful. public var isSuccess: Bool { @@ -485,9 +482,6 @@ extension TerminationStatus: CustomStringConvertible, CustomDebugStringConvertib // MARK: - Internal extension Configuration { - #if SubprocessSpan - @available(SubprocessSpan, *) - #endif /// After Spawn finishes, child side file descriptors /// (input read, output write, error write) will be closed /// by `spawn()`. It returns the parent side file descriptors @@ -581,7 +575,7 @@ internal enum StringOrRawBytes: Sendable, Hashable { } /// A wrapped `FileDescriptor` and whether it should be closed -/// automactially when done. +/// automatically when done. internal struct TrackedFileDescriptor: ~Copyable { internal var closeWhenDone: Bool internal let fileDescriptor: FileDescriptor @@ -667,7 +661,7 @@ internal struct TrackedFileDescriptor: ~Copyable { #if !os(Windows) /// A wrapped `DispatchIO` and whether it should be closed -/// automactially when done. +/// automatically when done. internal struct TrackedDispatchIO: ~Copyable { internal var closeWhenDone: Bool internal var dispatchIO: DispatchIO diff --git a/Sources/Subprocess/Execution.swift b/Sources/Subprocess/Execution.swift index c783c22..0aa06a7 100644 --- a/Sources/Subprocess/Execution.swift +++ b/Sources/Subprocess/Execution.swift @@ -27,12 +27,9 @@ import Musl import WinSDK #endif -/// An object that repersents a subprocess that has been +/// An object that represents a subprocess that has been /// executed. You can use this object to send signals to the /// child process as well as stream its output and error. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public struct Execution: Sendable { /// The process identifier of the current execution public let processIdentifier: ProcessIdentifier diff --git a/Sources/Subprocess/IO/Input.swift b/Sources/Subprocess/IO/Input.swift index 65e5da8..90904e8 100644 --- a/Sources/Subprocess/IO/Input.swift +++ b/Sources/Subprocess/IO/Input.swift @@ -53,7 +53,7 @@ public struct NoInput: InputProtocol { writeFileDescriptor: nil ) #else - let devnull: FileDescriptor = try .openDevNull(withAcessMode: .readOnly) + let devnull: FileDescriptor = try .openDevNull(withAccessMode: .readOnly) return CreatedPipe( readFileDescriptor: .init(devnull, closeWhenDone: true), writeFileDescriptor: nil @@ -147,7 +147,7 @@ internal struct CustomWriteInput: InputProtocol { } extension InputProtocol where Self == NoInput { - /// Create a Subprocess input that specfies there is no input + /// Create a Subprocess input that specifies there is no input public static var none: Self { .init() } } diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index 24c2092..77aead8 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -23,14 +23,12 @@ internal import Dispatch /// Instead of developing custom implementations of `OutputProtocol`, /// it is recommended to utilize the default implementations provided /// by the `Subprocess` library to specify the output handling requirements. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public protocol OutputProtocol: Sendable, ~Copyable { associatedtype OutputType: Sendable #if SubprocessSpan /// Convert the output from span to expected output type + @available(SubprocessSpan, *) func output(from span: RawSpan) throws -> OutputType #endif @@ -41,9 +39,6 @@ public protocol OutputProtocol: Sendable, ~Copyable { var maxSize: Int { get } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension OutputProtocol { /// The max amount of data to collect for this output. public var maxSize: Int { 128 * 1024 } @@ -55,9 +50,6 @@ extension OutputProtocol { /// redirects the standard output of the subprocess to `/dev/null`, /// while on Windows, it does not bind any file handle to the /// subprocess standard output handle. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public struct DiscardedOutput: OutputProtocol { public typealias OutputType = Void @@ -71,7 +63,7 @@ public struct DiscardedOutput: OutputProtocol { writeFileDescriptor: nil ) #else - let devnull: FileDescriptor = try .openDevNull(withAcessMode: .readOnly) + let devnull: FileDescriptor = try .openDevNull(withAccessMode: .readOnly) return CreatedPipe( readFileDescriptor: nil, writeFileDescriptor: .init(devnull, closeWhenDone: true) @@ -87,9 +79,6 @@ public struct DiscardedOutput: OutputProtocol { /// Developers have the option to instruct the `Subprocess` to /// automatically close the provided `FileDescriptor` /// after the subprocess is spawned. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public struct FileDescriptorOutput: OutputProtocol { public typealias OutputType = Void @@ -119,14 +108,12 @@ public struct FileDescriptorOutput: OutputProtocol { /// from the subprocess as `String` with the given encoding. /// This option must be used with he `run()` method that /// returns a `CollectedResult`. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public struct StringOutput: OutputProtocol { public typealias OutputType = String? public let maxSize: Int #if SubprocessSpan + @available(SubprocessSpan, *) public func output(from span: RawSpan) throws -> String? { // FIXME: Span to String var array: [UInt8] = [] @@ -135,13 +122,12 @@ public struct StringOutput: OutputProtocol { } return String(decodingBytes: array, as: Encoding.self) } - #else + #endif public func output(from buffer: some Sequence) throws -> String? { // FIXME: Span to String let array = Array(buffer) return String(decodingBytes: array, as: Encoding.self) } - #endif internal init(limit: Int, encoding: Encoding.Type) { self.maxSize = limit @@ -151,9 +137,6 @@ public struct StringOutput: OutputProtocol { /// A concrete `Output` type for subprocesses that collects output /// from the subprocess as `[UInt8]`. This option must be used with /// the `run()` method that returns a `CollectedResult` -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif public struct BytesOutput: OutputProtocol { public typealias OutputType = [UInt8] public let maxSize: Int @@ -174,14 +157,14 @@ public struct BytesOutput: OutputProtocol { } #if SubprocessSpan + @available(SubprocessSpan, *) public func output(from span: RawSpan) throws -> [UInt8] { fatalError("Not implemented") } - #else + #endif public func output(from buffer: some Sequence) throws -> [UInt8] { fatalError("Not implemented") } - #endif internal init(limit: Int) { self.maxSize = limit @@ -192,26 +175,17 @@ public struct BytesOutput: OutputProtocol { /// the child output to the `.standardOutput` (a sequence) or `.standardError` /// property of `Execution`. This output type is /// only applicable to the `run()` family that takes a custom closure. -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif internal struct SequenceOutput: OutputProtocol { public typealias OutputType = Void internal init() {} } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension OutputProtocol where Self == DiscardedOutput { /// Create a Subprocess output that discards the output public static var discarded: Self { .init() } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension OutputProtocol where Self == FileDescriptorOutput { /// Create a Subprocess output that writes output to a `FileDescriptor` /// and optionally close the `FileDescriptor` once process spawned. @@ -223,9 +197,6 @@ extension OutputProtocol where Self == FileDescriptorOutput { } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension OutputProtocol where Self == StringOutput { /// Create a `Subprocess` output that collects output as /// UTF8 String with 128kb limit. @@ -234,9 +205,6 @@ extension OutputProtocol where Self == StringOutput { } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension OutputProtocol { /// Create a `Subprocess` output that collects output as /// `String` using the given encoding up to limit it bytes. @@ -248,9 +216,7 @@ extension OutputProtocol { } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif + extension OutputProtocol where Self == BytesOutput { /// Create a `Subprocess` output that collects output as /// `Buffer` with 128kb limit. @@ -279,9 +245,6 @@ extension OutputProtocol { // MARK: - Default Implementations -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension OutputProtocol { @_disfavoredOverload internal func createPipe() throws -> CreatedPipe { @@ -322,22 +285,20 @@ extension OutputProtocol { } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension OutputProtocol where OutputType == Void { internal func captureOutput(from fileDescriptor: consuming TrackedPlatformDiskIO?) async throws {} #if SubprocessSpan /// Convert the output from Data to expected output type + @available(SubprocessSpan, *) public func output(from span: RawSpan) throws { // noop } - #else + #endif + public func output(from buffer: some Sequence) throws { // noop } - #endif // SubprocessSpan } #if SubprocessSpan diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 9a56106..0b93cd7 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -59,7 +59,7 @@ public struct PlatformOptions: Sendable { public var createSession: Bool = false /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before - /// the child proces terminates. + /// the child process terminates. /// Always ends in sending a `.kill` signal at the end. public var teardownSequence: [TeardownStep] = [] /// A closure to configure platform-specific @@ -153,9 +153,6 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible // MARK: - Spawn extension Configuration { - #if SubprocessSpan - @available(SubprocessSpan, *) - #endif internal func spawn( withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, @@ -209,7 +206,7 @@ extension Configuration { result = posix_spawn_file_actions_adddup2( &fileActions, inputReadFileDescriptor!.platformDescriptor(), 0) guard result == 0 else { - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -229,7 +226,7 @@ extension Configuration { &fileActions, inputWriteFileDescriptor!.platformDescriptor() ) guard result == 0 else { - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -249,7 +246,7 @@ extension Configuration { &fileActions, outputWriteFileDescriptor!.platformDescriptor(), 1 ) guard result == 0 else { - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -269,7 +266,7 @@ extension Configuration { &fileActions, outputReadFileDescriptor!.platformDescriptor() ) guard result == 0 else { - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -289,7 +286,7 @@ extension Configuration { &fileActions, errorWriteFileDescriptor!.platformDescriptor(), 2 ) guard result == 0 else { - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -309,7 +306,7 @@ extension Configuration { &fileActions, errorReadFileDescriptor!.platformDescriptor() ) guard result == 0 else { - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -359,7 +356,7 @@ extension Configuration { // Error handling if chdirError != 0 || spawnAttributeError != 0 { - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -412,7 +409,7 @@ extension Configuration { continue } // Throw all other errors - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -427,7 +424,7 @@ extension Configuration { } // After spawn finishes, close all child side fds - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: nil, outputRead: nil, @@ -452,7 +449,7 @@ extension Configuration { // provide which one is not valid, here we make a best effort guess // by checking whether the working directory is valid. This technically // still causes TOUTOC issue, but it's the best we can do for error recovery. - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index a488a52..84210db 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -121,7 +121,7 @@ extension Configuration { continue } // Throw all other errors - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -143,7 +143,7 @@ extension Configuration { } } // After spawn finishes, close all child side fds - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: nil, outputRead: nil, @@ -168,7 +168,7 @@ extension Configuration { // provide which one is not valid, here we make a best effort guess // by checking whether the working directory is valid. This technically // still causes TOUTOC issue, but it's the best we can do for error recovery. - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -357,7 +357,7 @@ private let setup: () = { }() /// Unchecked Sendable here since this class is only explicitly -/// initialzied once during the lifetime of the process +/// initialized once during the lifetime of the process final class SendableSourceSignal: @unchecked Sendable { private let signalSource: DispatchSourceSignal diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 47a31c1..ced3db1 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -108,9 +108,6 @@ extension ProcessIdentifier: CustomStringConvertible, CustomDebugStringConvertib public var debugDescription: String { "\(self.value)" } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension Execution { /// Send the given signal to the child process. /// - Parameters: @@ -378,7 +375,7 @@ extension FileDescriptor { } internal static func openDevNull( - withAcessMode mode: FileDescriptor.AccessMode + withAccessMode mode: FileDescriptor.AccessMode ) throws -> FileDescriptor { let devnull: FileDescriptor = try .open("/dev/null", mode) return devnull @@ -411,9 +408,6 @@ extension TrackedFileDescriptor { // MARK: - TrackedDispatchIO extensions extension DispatchIO { - #if SubprocessSpan - @available(SubprocessSpan, *) - #endif internal func read(upToLength maxLength: Int) async throws -> DispatchData? { return try await withCheckedThrowingContinuation { continuation in var buffer: DispatchData = .empty @@ -451,7 +445,7 @@ extension DispatchIO { } extension TrackedDispatchIO { -#if SubprocessSpan + #if SubprocessSpan @available(SubprocessSpan, *) internal func write( _ span: borrowing RawSpan @@ -477,7 +471,7 @@ extension TrackedDispatchIO { } } } -#endif // SubprocessSpan + #endif // SubprocessSpan internal func write( _ array: [UInt8] diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index d81889a..d879de2 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -102,7 +102,7 @@ extension Configuration { ) guard created else { let windowsError = GetLastError() - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor.take(), inputWrite: inputWriteFileDescriptor.take(), outputRead: outputReadFileDescriptor.take(), @@ -122,7 +122,7 @@ extension Configuration { // We don't need the handle objects, so close it right away guard CloseHandle(processInfo.hThread) else { let windowsError = GetLastError() - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -137,7 +137,7 @@ extension Configuration { } guard CloseHandle(processInfo.hProcess) else { let windowsError = GetLastError() - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -151,7 +151,7 @@ extension Configuration { ) } - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: nil, outputRead: nil, @@ -209,7 +209,7 @@ extension Configuration { if let configurator = self.platformOptions.preSpawnProcessConfigurator { try configurator(&createProcessFlags, &startupInfo) } - // Spawn (featuring pyamid!) + // Spawn (featuring pyramid!) try userCredentials.username.withCString( encodedAs: UTF16.self ) { usernameW in @@ -242,7 +242,7 @@ extension Configuration { ) guard created else { let windowsError = GetLastError() - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor.take(), inputWrite: inputWriteFileDescriptor.take(), outputRead: outputReadFileDescriptor.take(), @@ -265,7 +265,7 @@ extension Configuration { // We don't need the handle objects, so close it right away guard CloseHandle(processInfo.hThread) else { let windowsError = GetLastError() - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -280,7 +280,7 @@ extension Configuration { } guard CloseHandle(processInfo.hProcess) else { let windowsError = GetLastError() - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -295,7 +295,7 @@ extension Configuration { } // After spawn finishes, close all child side fds - try self.safelyCloseMultuple( + try self.safelyCloseMultiple( inputRead: inputReadFileDescriptor, inputWrite: nil, outputRead: nil, @@ -343,7 +343,7 @@ public struct PlatformOptions: Sendable { public struct ConsoleBehavior: Sendable, Hashable { internal enum Storage: Sendable, Hashable { case createNew - case detatch + case detach case inherit } @@ -360,7 +360,7 @@ public struct PlatformOptions: Sendable { /// inherit its parent's console (the default). /// The new process can call the `AllocConsole` /// function at a later time to create a console. - public static let detatch: Self = .init(.detatch) + public static let detach: Self = .init(.detach) /// The subprocess inherits its parent's console. public static let inherit: Self = .init(.inherit) } @@ -415,7 +415,7 @@ public struct PlatformOptions: Sendable { public var createProcessGroup: Bool = false /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before - /// the child proces terminates. + /// the child process terminates. /// Always ends in forcefully terminate at the end. public var teardownSequence: [TeardownStep] = [] /// A closure to configure platform-specific @@ -669,7 +669,7 @@ extension Executable { return try pathValue.withOptionalCString( encodedAs: UTF16.self ) { path -> String in - let pathLenth = SearchPathW( + let pathLength = SearchPathW( path, exeName, nil, @@ -677,7 +677,7 @@ extension Executable { nil, nil ) - guard pathLenth > 0 else { + guard pathLength > 0 else { throw SubprocessError( code: .init(.executableNotFound(executableName)), underlyingError: .init(rawValue: GetLastError()) @@ -685,13 +685,13 @@ extension Executable { } return withUnsafeTemporaryAllocation( of: WCHAR.self, - capacity: Int(pathLenth) + 1 + capacity: Int(pathLength) + 1 ) { _ = SearchPathW( path, exeName, nil, - pathLenth + 1, + pathLength + 1, $0.baseAddress, nil ) @@ -750,7 +750,7 @@ extension Environment { /// A platform independent identifier for a subprocess. public struct ProcessIdentifier: Sendable, Hashable, Codable { - /// Windows specifc process identifier value + /// Windows specific process identifier value public let value: DWORD internal init(value: DWORD) { @@ -832,7 +832,7 @@ extension Configuration { switch self.platformOptions.consoleBehavior.storage { case .createNew: flags |= CREATE_NEW_CONSOLE - case .detatch: + case .detach: flags |= DETACHED_PROCESS case .inherit: break @@ -865,7 +865,7 @@ extension Configuration { info.hStdInput = inputReadFileDescriptor!.platformDescriptor() } if inputWriteFileDescriptor != nil { - // Set parent side to be uninhertable + // Set parent side to be uninheritable SetHandleInformation( inputWriteFileDescriptor!.platformDescriptor(), DWORD(HANDLE_FLAG_INHERIT), @@ -877,7 +877,7 @@ extension Configuration { info.hStdOutput = outputWriteFileDescriptor!.platformDescriptor() } if outputReadFileDescriptor != nil { - // Set parent side to be uninhertable + // Set parent side to be uninheritable SetHandleInformation( outputReadFileDescriptor!.platformDescriptor(), DWORD(HANDLE_FLAG_INHERIT), @@ -889,7 +889,7 @@ extension Configuration { info.hStdError = errorWriteFileDescriptor!.platformDescriptor() } if errorReadFileDescriptor != nil { - // Set parent side to be uninhertable + // Set parent side to be uninheritable SetHandleInformation( errorReadFileDescriptor!.platformDescriptor(), DWORD(HANDLE_FLAG_INHERIT), @@ -1123,7 +1123,7 @@ extension FileDescriptor { } break } else { - // We succesfully read the current round + // We successfully read the current round totalBytesRead += Int(bytesRead) } @@ -1192,7 +1192,7 @@ extension TrackedFileDescriptor { _ array: [UInt8] ) async throws -> Int { try await withCheckedThrowingContinuation { continuation in - // TODO: Figure out a better way to asynchornously write + // TODO: Figure out a better way to asynchronously write let fd = self.fileDescriptor DispatchQueue.global(qos: .userInitiated).async { array.withUnsafeBytes { diff --git a/Sources/Subprocess/Teardown.swift b/Sources/Subprocess/Teardown.swift index 66a4ce2..f9d1146 100644 --- a/Sources/Subprocess/Teardown.swift +++ b/Sources/Subprocess/Teardown.swift @@ -73,9 +73,6 @@ public struct TeardownStep: Sendable, Hashable { } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension Execution { /// Performs a sequence of teardown steps on the Subprocess. /// Teardown sequence always ends with a `.kill` signal @@ -88,7 +85,7 @@ extension Execution { using sequence: some Sequence & Sendable, on processIdentifier: ProcessIdentifier ) async { - await withUncancelledTask { + await withUncanceledTask { await Self.runTeardownSequence(sequence, on: processIdentifier) } } @@ -100,9 +97,6 @@ internal enum TeardownStepCompletion { case killedTheProcess } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif extension Execution { internal static func gracefulShutDown( _ processIdentifier: ProcessIdentifier, @@ -177,7 +171,7 @@ extension Execution { try await Task.sleep(for: allowedDuration) return .processStillAlive } catch { - // teardown(using:) cancells this task + // teardown(using:) cancels this task // when process has exited return .processHasExited } @@ -196,7 +190,7 @@ extension Execution { try await Task.sleep(for: allowedDuration) return .processStillAlive } catch { - // teardown(using:) cancells this task + // teardown(using:) cancels this task // when process has exited return .processHasExited } @@ -228,14 +222,14 @@ extension Execution { } } -func withUncancelledTask( +func withUncanceledTask( returning: Result.Type = Result.self, _ body: @Sendable @escaping () async -> Result ) async -> Result { // This looks unstructured but it isn't, please note that we `await` `.value` of this task. // The reason we need this separate `Task` is that in general, we cannot assume that code performs to our // expectations if the task we run it on is already cancelled. However, in some cases we need the code to - // run regardless -- even if our task is already cancelled. Therefore, we create a new, uncancelled task here. + // run regardless -- even if our task is already cancelled. Therefore, we create a new, uncanceled task here. await Task { await body() }.value diff --git a/Tests/SubprocessTests/SubprocessTests+Linux.swift b/Tests/SubprocessTests/SubprocessTests+Linux.swift index 3324343..fc801d6 100644 --- a/Tests/SubprocessTests/SubprocessTests+Linux.swift +++ b/Tests/SubprocessTests/SubprocessTests+Linux.swift @@ -27,7 +27,7 @@ import Testing // MARK: PlatformOption Tests @Suite(.serialized) struct SubprocessLinuxTests { - @Test func testSubprocessPlatfomOptionsPreSpawnProcessConfigurator() async throws { + @Test func testSubprocessPlatformOptionsPreSpawnProcessConfigurator() async throws { var platformOptions = PlatformOptions() platformOptions.preSpawnProcessConfigurator = { setgid(4321) @@ -77,7 +77,7 @@ struct SubprocessLinuxTests { arguments: ["infinity"], error: .discarded ) { subprocess, standardOutput in - // First suspend the procss + // First suspend the process try subprocess.send(signal: .suspend) #expect( try isProcessSuspended(subprocess.processIdentifier.value) diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index fdba78e..01f1735 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -391,7 +391,7 @@ extension SubprocessUnixTests { guard #available(SubprocessSpan , *) else { return } - // Maeks ure we can read long text as AsyncSequence + // Make sure we can read long text as AsyncSequence let fd: FileDescriptor = try .open(theMysteriousIsland, .readOnly) let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) @@ -440,7 +440,7 @@ extension SubprocessUnixTests { } @Test func testInputAsyncSequenceCustomExecutionBody() async throws { - // Maeks ure we can read long text as AsyncSequence + // Make sure we can read long text as AsyncSequence let fd: FileDescriptor = try .open(theMysteriousIsland, .readOnly) let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) @@ -486,7 +486,7 @@ extension SubprocessUnixTests { output: .discard ) #expect(echoResult.terminationStatus.isSuccess) - _ = echoResult.standardOutput // this line shold fatalError + _ = echoResult.standardOutput // this line should fatalError } #endif @@ -526,7 +526,7 @@ extension SubprocessUnixTests { #expect(String(expected[targetRange]) == output) } - @Test func testCollectedOutputFileDesriptor() async throws { + @Test func testCollectedOutputFileDescriptor() async throws { guard #available(SubprocessSpan , *) else { return } @@ -600,7 +600,7 @@ extension SubprocessUnixTests { } @Test func testRedirectedOutputWithUnsafeBytes() async throws { - // Make ure we can read long text redirected to AsyncSequence + // Make sure we can read long text redirected to AsyncSequence let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) ) @@ -622,7 +622,7 @@ extension SubprocessUnixTests { #if SubprocessSpan @Test func testRedirectedOutputBytes() async throws { - // Make ure we can read long text redirected to AsyncSequence + // Make sure we can read long text redirected to AsyncSequence let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) ) @@ -632,7 +632,7 @@ extension SubprocessUnixTests { ) { (execution: Execution, standardOutput: AsyncBufferSequence) -> Data in var buffer: Data = Data() for try await chunk in standardOutput { - buffer += Data(bytes: chunk.bytes) + buffer += chunk.withUnsafeBytes { Data(bytes: $0.baseAddress!, count: chunk.count) } } return buffer } @@ -662,7 +662,7 @@ extension SubprocessUnixTests { guard #available(SubprocessSpan , *) else { return } - // Make ure we can capture long text on standard error + // Make sure we can capture long text on standard error let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) ) @@ -739,7 +739,7 @@ extension SubprocessUnixTests { "This test requires root privileges" ) ) - func testSubprocssPlatformOptionsSuplimentaryGroups() async throws { + func testSubprocessPlatformOptionsSupplementaryGroups() async throws { guard #available(SubprocessSpan , *) else { return } @@ -808,8 +808,8 @@ extension SubprocessUnixTests { // platformOptions.createSession implies calls to setsid var platformOptions = PlatformOptions() platformOptions.createSession = true - // Check the proces ID (pid), pross group ID (pgid), and - // controling terminal's process group ID (tpgid) + // Check the process ID (pid), process group ID (pgid), and + // controlling terminal's process group ID (tpgid) let psResult = try await Subprocess.run( .path("/bin/bash"), arguments: ["-c", "ps -o pid,pgid,tpgid -p $$"], @@ -908,7 +908,7 @@ extension SubprocessUnixTests { for try await _ in standardOutput {} } guard case .unhandledException(let exception) = stuckResult.terminationStatus else { - Issue.record("Wrong termination status repored: \(stuckResult.terminationStatus)") + Issue.record("Wrong termination status reported: \(stuckResult.terminationStatus)") return } #expect(exception == Signal.terminate.rawValue) @@ -953,7 +953,7 @@ extension SubprocessUnixTests { return result } } - preconditionFailure("Task shold have returned a result") + preconditionFailure("Task should have returned a result") } #expect(result == .unhandledException(SIGKILL)) } @@ -977,7 +977,7 @@ extension SubprocessUnixTests { [0xE2, 0x80, 0xA9] // Paragraph separator ] - // Geneate test cases + // Generate test cases func generateString(size: TestCaseSize) -> [UInt8] { // Basic Latin has the range U+0020 ... U+007E let range: ClosedRange = 0x20 ... 0x7E // 0x4E ... 0x5A @@ -1008,7 +1008,7 @@ extension SubprocessUnixTests { let remaining = count - targetSizes.count let rest = TestCaseSize.allCases.shuffled().prefix(remaining) targetSizes.append(contentsOf: rest) - // Do a final shuffle to achiave random order + // Do a final shuffle to achieve random order targetSizes.shuffle() // Now generate test cases based on sizes var testCases: [TestCase] = [] @@ -1061,7 +1061,7 @@ extension SubprocessUnixTests { #expect( line == testCases[index].value, """ - Found mistachig line at index \(index) + Found mismatching line at index \(index) Expected: [\(testCases[index].value)] Actual: [\(line)] Line Ending \(Array(testCases[index].newLine.utf8)) @@ -1111,7 +1111,7 @@ internal func assertNewSessionCreated( result.standardOutput ) let match = try #require(try #/\s*PID\s*PGID\s*TPGID\s*(?[\-]?[0-9]+)\s*(?[\-]?[0-9]+)\s*(?[\-]?[0-9]+)\s*/#.wholeMatch(in: psValue), "ps output was in an unexpected format:\n\n\(psValue)") - // If setsid() has been called successfully, we shold observe: + // If setsid() has been called successfully, we should observe: // - pid == pgid // - tpgid <= 0 let pid = try #require(Int(match.output.pid)) diff --git a/Tests/SubprocessTests/SubprocessTests+Windows.swift b/Tests/SubprocessTests/SubprocessTests+Windows.swift index 2226cc2..0f78a41 100644 --- a/Tests/SubprocessTests/SubprocessTests+Windows.swift +++ b/Tests/SubprocessTests/SubprocessTests+Windows.swift @@ -94,7 +94,7 @@ extension SubprocessWindowsTests { do { // Since we are using the path directly, // we expect the error to be thrown by the underlying - // CreateProcssW + // CreateProcessW _ = try await Subprocess.run(.path("X:\\do-not-exist")) Issue.record("Expected to throw POSIXError") } catch { @@ -322,7 +322,7 @@ extension SubprocessWindowsTests { #endif @Test func testInputAsyncSequence() async throws { let chunkSize = 4096 - // Maeks ure we can read long text as AsyncSequence + // Make sure we can read long text as AsyncSequence let fd: FileDescriptor = try .open(theMysteriousIsland, .readOnly) let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) @@ -380,7 +380,7 @@ extension SubprocessWindowsTests { @available(SubprocessSpan, *) #endif @Test func testInputAsyncSequenceCustomExecutionBody() async throws { - // Maeks ure we can read long text as AsyncSequence + // Make sure we can read long text as AsyncSequence let chunkSize = 4096 let fd: FileDescriptor = try .open(theMysteriousIsland, .readOnly) let expected: Data = try Data( @@ -458,7 +458,7 @@ extension SubprocessWindowsTests { #if SubprocessSpan @available(SubprocessSpan, *) #endif - @Test func testCollectedOutputFileDesriptor() async throws { + @Test func testCollectedOutputFileDescriptor() async throws { let outputFilePath = FilePath( FileManager.default.temporaryDirectory._fileSystemPath ).appending("Test.out") @@ -493,7 +493,7 @@ extension SubprocessWindowsTests { @available(SubprocessSpan, *) #endif @Test func testRedirectedOutputRedirectToSequence() async throws { - // Maeks ure we can read long text redirected to AsyncSequence + // Make sure we can read long text redirected to AsyncSequence let expected: Data = try Data( contentsOf: URL(filePath: theMysteriousIsland.string) ) @@ -546,7 +546,7 @@ extension SubprocessWindowsTests { // whoami returns `computerName\userName`. let userInfo = result.split(separator: "\\") guard userInfo.count == 2 else { - Issue.record("Fail to parse the restult for whoami: \(result)") + Issue.record("Fail to parse the result for whoami: \(result)") return } #expect( @@ -576,7 +576,7 @@ extension SubprocessWindowsTests { #expect( "\(intptr_t(bitPattern: parentConsole))" == sameConsoleValue ) - // Now launch a procss with new console + // Now launch a process with new console var platformOptions: Subprocess.PlatformOptions = .init() platformOptions.consoleBehavior = .createNew let differentConsoleResult = try await Subprocess.run( @@ -603,7 +603,7 @@ extension SubprocessWindowsTests { #endif @Test func testPlatformOptionsDetachedProcess() async throws { var platformOptions: Subprocess.PlatformOptions = .init() - platformOptions.consoleBehavior = .detatch + platformOptions.consoleBehavior = .detach let detachConsoleResult = try await Subprocess.run( .path("C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\powershell.exe"), arguments: [ @@ -617,7 +617,7 @@ extension SubprocessWindowsTests { let detachConsoleValue = try #require( detachConsoleResult.standardOutput ).trimmingCharacters(in: .whitespacesAndNewlines) - // Detached process shoud NOT have a console + // Detached process should NOT have a console #expect(detachConsoleValue.isEmpty) } @@ -714,8 +714,8 @@ extension SubprocessWindowsTests { error: .discarded ) { subprocess, standardOutput in try subprocess.suspend() - // Now check the to make sure the procss is actually suspended - // Why not spawn a nother process to do that? + // Now check the to make sure the process is actually suspended + // Why not spawn another process to do that? var checkResult = try await Subprocess.run( .path("C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\powershell.exe"), arguments: [ @@ -772,7 +772,7 @@ extension SubprocessWindowsTests { ], output: writeFd ) - // Wait for procss to finish + // Wait for process to finish guard let processHandle = OpenProcess( DWORD(PROCESS_QUERY_INFORMATION | SYNCHRONIZE), @@ -898,7 +898,7 @@ extension SubprocessWindowsTests { extension FileDescriptor { internal func readUntilEOF(upToLength maxLength: Int) async throws -> Data { - // TODO: Figure out a better way to asynchornously read + // TODO: Figure out a better way to asynchronously read return try await withCheckedThrowingContinuation { continuation in DispatchQueue.global(qos: .userInitiated).async { var totalBytesRead: Int = 0 @@ -933,7 +933,7 @@ extension FileDescriptor { } break } else { - // We succesfully read the current round + // We successfully read the current round totalBytesRead += Int(bytesRead) } From 195fcee9c15b0c6e7831afdf5030e94043c56264 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Thu, 5 Jun 2025 16:59:20 -0700 Subject: [PATCH 8/8] LineSequence: throw error when max line length exceeded instead of returning partial (and potentially incorrect) data --- Sources/Subprocess/AsyncBufferSequence.swift | 94 +++++++++++-------- Sources/Subprocess/Buffer.swift | 23 ++--- Sources/Subprocess/Error.swift | 17 ++-- Sources/Subprocess/IO/Output.swift | 17 +--- Sources/Subprocess/Teardown.swift | 4 +- .../SubprocessTests+Unix.swift | 5 +- 6 files changed, 85 insertions(+), 75 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index ed1979e..80d3c9f 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -81,9 +81,10 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { return Iterator(diskIO: self.diskIO) } + // [New API: 0.0.1] public func lines( encoding: Encoding.Type = UTF8.self, - bufferingPolicy: LineSequence.BufferingPolicy = .unbounded + bufferingPolicy: LineSequence.BufferingPolicy = .maxLineLength(128 * 1024) ) -> LineSequence { return LineSequence(underlying: self, encoding: encoding, bufferingPolicy: bufferingPolicy) } @@ -91,6 +92,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { // MARK: - LineSequence extension AsyncBufferSequence { + // [New API: 0.0.1] public struct LineSequence: AsyncSequence, Sendable { public typealias Element = String @@ -140,25 +142,20 @@ extension AsyncBufferSequence { } #else // Unfortunately here we _have to_ copy the bytes out because - // DisptachIO (rightfully) reuses buffer, which means `buffer.data` + // DispatchIO (rightfully) reuses buffer, which means `buffer.data` // has the same address on all iterations, therefore we can't directly // create the result array from buffer.data - let temporary = UnsafeMutableBufferPointer.allocate( - capacity: buffer.data.count - ) - defer { temporary.deallocate() } - let actualBytesCopied = buffer.data.copyBytes( - to: temporary, - count: buffer.data.count - ) // Calculate how many CodePoint elements we have - let elementCount = actualBytesCopied / MemoryLayout.stride + let elementCount = buffer.data.count / MemoryLayout.stride // Create array by copying from the buffer reinterpreted as CodePoint - let result: Array = Array( - UnsafeBufferPointer(start: temporary.baseAddress, count: elementCount) - ) + let result: Array = buffer.data.withUnsafeBytes { ptr -> Array in + return Array( + UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount) + ) + } + #endif return result.isEmpty ? nil : result } @@ -180,18 +177,38 @@ extension AsyncBufferSequence { /// let formFeed = Encoding.CodeUnit(0x0C) let carriageReturn = Encoding.CodeUnit(0x0D) // carriageReturn + lineFeed - let newLine: Encoding.CodeUnit - let lineSeparator: Encoding.CodeUnit - let paragraphSeparator: Encoding.CodeUnit + let newLine1: Encoding.CodeUnit + let newLine2: Encoding.CodeUnit + let lineSeparator1: Encoding.CodeUnit + let lineSeparator2: Encoding.CodeUnit + let lineSeparator3: Encoding.CodeUnit + let paragraphSeparator1: Encoding.CodeUnit + let paragraphSeparator2: Encoding.CodeUnit + let paragraphSeparator3: Encoding.CodeUnit switch Encoding.CodeUnit.self { case is UInt8.Type: - newLine = Encoding.CodeUnit(0xC2) // 0xC2 0x85 - lineSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA8 - paragraphSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA9 + newLine1 = Encoding.CodeUnit(0xC2) + newLine2 = Encoding.CodeUnit(0x85) + + lineSeparator1 = Encoding.CodeUnit(0xE2) + lineSeparator2 = Encoding.CodeUnit(0x80) + lineSeparator3 = Encoding.CodeUnit(0xA8) + + paragraphSeparator1 = Encoding.CodeUnit(0xE2) + paragraphSeparator2 = Encoding.CodeUnit(0x80) + paragraphSeparator3 = Encoding.CodeUnit(0xA9) case is UInt16.Type, is UInt32.Type: - newLine = Encoding.CodeUnit(0x0085) - lineSeparator = Encoding.CodeUnit(0x2028) - paragraphSeparator = Encoding.CodeUnit(0x2029) + // UTF16 and UTF32 use one byte for all + newLine1 = Encoding.CodeUnit(0x0085) + newLine2 = Encoding.CodeUnit(0x0085) + + lineSeparator1 = Encoding.CodeUnit(0x2028) + lineSeparator2 = Encoding.CodeUnit(0x2028) + lineSeparator3 = Encoding.CodeUnit(0x2028) + + paragraphSeparator1 = Encoding.CodeUnit(0x2029) + paragraphSeparator2 = Encoding.CodeUnit(0x2029) + paragraphSeparator3 = Encoding.CodeUnit(0x2029) default: fatalError("Unknown encoding type \(Encoding.self)") } @@ -210,10 +227,13 @@ extension AsyncBufferSequence { var currentIndex: Int = self.startIndex for index in self.startIndex ..< self.buffer.count { currentIndex = index - // Early return if we exceed max line length + // Throw if we exceed max line length if case .maxLineLength(let maxLength) = self.bufferingPolicy, currentIndex >= maxLength { - return yield(at: currentIndex) + throw SubprocessError( + code: .init(.streamOutputExceedsLimit(maxLength)), + underlyingError: nil + ) } let byte = self.buffer[currentIndex] switch byte { @@ -232,12 +252,12 @@ extension AsyncBufferSequence { continue } return result - case newLine: + case newLine1: var targetIndex = currentIndex if Encoding.CodeUnit.self is UInt8.Type { // For UTF8, look for the next 0x85 byte guard (targetIndex + 1) < self.buffer.count, - self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x85) else { + self.buffer[targetIndex + 1] == newLine2 else { // Not a valid new line. Keep looking continue } @@ -248,21 +268,22 @@ extension AsyncBufferSequence { continue } return result - case lineSeparator, paragraphSeparator: + case lineSeparator1, paragraphSeparator1: var targetIndex = currentIndex if Encoding.CodeUnit.self is UInt8.Type { - // For UTF8, look for the next 0x80 byte + // For UTF8, look for the next byte guard (targetIndex + 1) < self.buffer.count, - self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x80) else { + self.buffer[targetIndex + 1] == lineSeparator2 || + self.buffer[targetIndex + 1] == paragraphSeparator2 else { // Not a valid new line. Keep looking continue } - // Swallow 0x80 byte + // Swallow next byte targetIndex += 1 - // Look for the final 0xA8 (lineSeparator) or 0xA9 (paragraphSeparator) + // Look for the final byte guard (targetIndex + 1) < self.buffer.count, - (self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA8) || - self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA9)) else { + (self.buffer[targetIndex + 1] == lineSeparator3 || + self.buffer[targetIndex + 1] == paragraphSeparator3) else { // Not a valid new line. Keep looking continue } @@ -308,9 +329,8 @@ extension AsyncBufferSequence.LineSequence { /// on the number of buffered elements (line length). case unbounded /// Impose a max buffer size (line length) limit. - /// When using this policy, `LineSequence` will return a line if: - /// - A newline character is encountered (standard behavior) - /// - The current line in the buffer reaches or exceeds the specified maximum length + /// Subprocess **will throw an error** if the number of buffered + /// elements (line length) exceeds the limit case maxLineLength(Int) } } diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index 6ed9839..e2245e7 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -28,17 +28,17 @@ extension AsyncBufferSequence { return [.init(data: data)] } #else - // We need to keep the backingData alive while Slice is alive + // We need to keep the backingData alive while _ContiguousBufferView is alive internal let backingData: DispatchData - internal let data: DispatchData.Slice + internal let data: DispatchData._ContiguousBufferView - internal init(data: DispatchData.Slice, backingData: DispatchData) { + internal init(data: DispatchData._ContiguousBufferView, backingData: DispatchData) { self.data = data self.backingData = backingData } internal static func createFrom(_ data: DispatchData) -> [Buffer] { - let slices = data.slices + let slices = data.contiguousBufferViews // In most (all?) cases data should only have one slice if _fastPath(slices.count == 1) { return [.init(data: slices[0], backingData: data)] @@ -114,7 +114,7 @@ extension AsyncBufferSequence.Buffer: Equatable, Hashable { #if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) extension DispatchData { /// Unfortunately `DispatchData.Region` is not available on Linux, hence our own wrapper - internal struct Slice: @unchecked Sendable, RandomAccessCollection { + internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection { typealias Element = UInt8 internal let bytes: UnsafeBufferPointer @@ -130,13 +130,6 @@ extension DispatchData { return try body(UnsafeRawBufferPointer(self.bytes)) } - @discardableResult - internal func copyBytes( - to ptr: UnsafeMutableBufferPointer, count: Int - ) -> Int { - self.bytes.copyBytes(to: ptr, count: count) - } - subscript(position: Int) -> UInt8 { _read { yield self.bytes[position] @@ -144,10 +137,10 @@ extension DispatchData { } } - internal var slices: [Slice] { - var slices = [Slice]() + internal var contiguousBufferViews: [_ContiguousBufferView] { + var slices = [_ContiguousBufferView]() enumerateBytes { (bytes, index, stop) in - slices.append(Slice(bytes: bytes)) + slices.append(_ContiguousBufferView(bytes: bytes)) } return slices } diff --git a/Sources/Subprocess/Error.swift b/Sources/Subprocess/Error.swift index 870e72b..dde4468 100644 --- a/Sources/Subprocess/Error.swift +++ b/Sources/Subprocess/Error.swift @@ -40,6 +40,7 @@ extension SubprocessError { case failedToReadFromSubprocess case failedToWriteToSubprocess case failedToMonitorProcess + case streamOutputExceedsLimit(Int) // Signal case failedToSendSignal(Int32) // Windows Only @@ -64,18 +65,20 @@ extension SubprocessError { return 4 case .failedToMonitorProcess: return 5 - case .failedToSendSignal(_): + case .streamOutputExceedsLimit(_): return 6 - case .failedToTerminate: + case .failedToSendSignal(_): return 7 - case .failedToSuspend: + case .failedToTerminate: return 8 - case .failedToResume: + case .failedToSuspend: return 9 - case .failedToCreatePipe: + case .failedToResume: return 10 - case .invalidWindowsPath(_): + case .failedToCreatePipe: return 11 + case .invalidWindowsPath(_): + return 12 } } @@ -103,6 +106,8 @@ extension SubprocessError: CustomStringConvertible, CustomDebugStringConvertible return "Failed to write bytes to the child process." case .failedToMonitorProcess: return "Failed to monitor the state of child process with underlying error: \(self.underlyingError!)" + case .streamOutputExceedsLimit(let limit): + return "Failed to create output from current buffer because the output limit (\(limit)) was reached." case .failedToSendSignal(let signal): return "Failed to send signal \(signal) to the child process." case .failedToTerminate: diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index 77aead8..4af3838 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -144,14 +144,10 @@ public struct BytesOutput: OutputProtocol { internal func captureOutput( from diskIO: consuming TrackedPlatformDiskIO? ) async throws -> [UInt8] { - guard let diskIO = diskIO else { - // Show not happen due to type system constraints - fatalError("Trying to capture output without file descriptor") - } #if os(Windows) - return try await diskIO.fileDescriptor.read(upToLength: self.maxSize) ?? [] + return try await diskIO?.fileDescriptor.read(upToLength: self.maxSize) ?? [] #else - let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize) + let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize) return result?.array() ?? [] #endif } @@ -270,16 +266,11 @@ extension OutputProtocol { return () as! OutputType } - guard let diskIO = diskIO else { - // Show not happen due to type system constraints - fatalError("Trying to capture output without file descriptor") - } - #if os(Windows) - let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize) + let result = try await diskIO?.fileDescriptor.read(upToLength: self.maxSize) return try self.output(from: result ?? []) #else - let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize) + let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize) return try self.output(from: result ?? .empty) #endif } diff --git a/Sources/Subprocess/Teardown.swift b/Sources/Subprocess/Teardown.swift index f9d1146..ec452d7 100644 --- a/Sources/Subprocess/Teardown.swift +++ b/Sources/Subprocess/Teardown.swift @@ -85,7 +85,7 @@ extension Execution { using sequence: some Sequence & Sendable, on processIdentifier: ProcessIdentifier ) async { - await withUncanceledTask { + await withUncancelledTask { await Self.runTeardownSequence(sequence, on: processIdentifier) } } @@ -222,7 +222,7 @@ extension Execution { } } -func withUncanceledTask( +func withUncancelledTask( returning: Result.Type = Result.self, _ body: @Sendable @escaping () async -> Result ) async -> Result { diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 01f1735..32c940c 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -980,7 +980,7 @@ extension SubprocessUnixTests { // Generate test cases func generateString(size: TestCaseSize) -> [UInt8] { // Basic Latin has the range U+0020 ... U+007E - let range: ClosedRange = 0x20 ... 0x7E // 0x4E ... 0x5A + let range: ClosedRange = 0x20 ... 0x7E let length: Int switch size { @@ -1044,7 +1044,7 @@ extension SubprocessUnixTests { } let testCaseCount = 60 - let testFilePath = URL.temporaryDirectory.appending(path: "NewLines.txt") + let testFilePath = URL.temporaryDirectory.appending(path: "NewLines-\(UUID().uuidString).txt") if FileManager.default.fileExists(atPath: testFilePath.path()) { try FileManager.default.removeItem(at: testFilePath) } @@ -1070,6 +1070,7 @@ extension SubprocessUnixTests { index += 1 } } + try FileManager.default.removeItem(at: testFilePath) } }