diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 80d3c9f..39fb38b 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -104,8 +104,9 @@ extension AsyncBufferSequence { private var source: AsyncBufferSequence.AsyncIterator private var buffer: [Encoding.CodeUnit] + private var underlyingBuffer: [Encoding.CodeUnit] + private var leftover: Encoding.CodeUnit? private var eofReached: Bool - private var startIndex: Int private let bufferingPolicy: BufferingPolicy internal init( @@ -114,8 +115,9 @@ extension AsyncBufferSequence { ) { self.source = underlyingIterator self.buffer = [] + self.underlyingBuffer = [] + self.leftover = nil self.eofReached = false - self.startIndex = 0 self.bufferingPolicy = bufferingPolicy } @@ -160,15 +162,32 @@ extension AsyncBufferSequence { return result.isEmpty ? nil : result } - func yield(at endIndex: Int) -> String? { + func yield() -> String? { defer { - self.buffer.removeFirst(endIndex) - self.startIndex = 0 + self.buffer.removeAll(keepingCapacity: true) } if self.buffer.isEmpty { return nil } - return String(decoding: self.buffer[0 ..< endIndex], as: Encoding.self) + return String(decoding: self.buffer, as: Encoding.self) + } + + func nextFromSource() async throws -> Encoding.CodeUnit? { + if underlyingBuffer.isEmpty { + guard let buf = try await loadBuffer() else { + return nil + } + underlyingBuffer = buf + } + return underlyingBuffer.removeFirst() + } + + func nextCodeUnit() async throws -> Encoding.CodeUnit? { + defer { leftover = nil } + if let leftover = leftover { + return leftover + } + return try await nextFromSource() } // https://en.wikipedia.org/wiki/Newline#Unicode @@ -213,95 +232,70 @@ extension AsyncBufferSequence { fatalError("Unknown encoding type \(Encoding.self)") } - while true { - // Step 1: Load more buffer if needed - if self.startIndex >= self.buffer.count { - guard let nextBuffer = try await loadBuffer() else { - // We have no more data - // Return the remaining data - return yield(at: self.buffer.count) - } - self.buffer += nextBuffer + while let first = try await nextCodeUnit() { + // Throw if we exceed max line length + if case .maxLineLength(let maxLength) = self.bufferingPolicy, buffer.count >= maxLength { + throw SubprocessError( + code: .init(.streamOutputExceedsLimit(maxLength)), + underlyingError: nil + ) } - // Step 2: Iterate through buffer to find next line - var currentIndex: Int = self.startIndex - for index in self.startIndex ..< self.buffer.count { - currentIndex = index - // Throw if we exceed max line length - if case .maxLineLength(let maxLength) = self.bufferingPolicy, - currentIndex >= maxLength { - throw SubprocessError( - code: .init(.streamOutputExceedsLimit(maxLength)), - underlyingError: nil - ) + + buffer.append(first) + switch first { + case carriageReturn: + // Swallow up any subsequent LF + guard let next = try await nextFromSource() else { + return yield() // if we ran out of bytes, the last byte was a CR } - let byte = self.buffer[currentIndex] - switch byte { - case carriageReturn: - // Swallow any subsequent lineFeed if there is one - var targetIndex = currentIndex - if (currentIndex + 1) < self.buffer.count, self.buffer[currentIndex + 1] == lineFeed { - targetIndex = currentIndex + 1 - } - guard let result = yield(at: targetIndex + 1) else { - continue - } - return result - case lineFeed ..< carriageReturn: - guard let result = yield(at: currentIndex + 1) else { - continue - } - return result - case newLine1: - var targetIndex = currentIndex - if Encoding.CodeUnit.self is UInt8.Type { - // For UTF8, look for the next 0x85 byte - guard (targetIndex + 1) < self.buffer.count, - self.buffer[targetIndex + 1] == newLine2 else { - // Not a valid new line. Keep looking - continue - } - // Swallow 0x85 byte - targetIndex += 1 - } - guard let result = yield(at: targetIndex + 1) else { - continue - } - return result - case lineSeparator1, paragraphSeparator1: - var targetIndex = currentIndex - if Encoding.CodeUnit.self is UInt8.Type { - // For UTF8, look for the next byte - guard (targetIndex + 1) < self.buffer.count, - self.buffer[targetIndex + 1] == lineSeparator2 || - self.buffer[targetIndex + 1] == paragraphSeparator2 else { - // Not a valid new line. Keep looking - continue - } - // Swallow next byte - targetIndex += 1 - // Look for the final byte - guard (targetIndex + 1) < self.buffer.count, - (self.buffer[targetIndex + 1] == lineSeparator3 || - self.buffer[targetIndex + 1] == paragraphSeparator3) else { - // Not a valid new line. Keep looking - continue - } - // Swallow 0xA8 (or 0xA9) byte - targetIndex += 1 - } - guard let result = yield(at: targetIndex + 1) else { - continue - } - return result - default: - // Keep searching + buffer.append(next) + guard next == lineFeed else { + // if the next character was not an LF, save it for the next iteration and still return a line + leftover = buffer.removeLast() + return yield() + } + return yield() + case newLine1 where Encoding.CodeUnit.self is UInt8.Type: // this may be used to compose other UTF8 characters + guard let next = try await nextFromSource() else { + // technically invalid UTF8 but it should be repaired to "\u{FFFD}" + return yield() + } + buffer.append(next) + guard next == newLine2 else { + continue + } + return yield() + case lineSeparator1 where Encoding.CodeUnit.self is UInt8.Type, + paragraphSeparator1 where Encoding.CodeUnit.self is UInt8.Type: + // Try to read: 80 [A8 | A9]. + // If we can't, then we put the byte in the buffer for error correction + guard let next = try await nextFromSource() else { + return yield() + } + buffer.append(next) + guard next == lineSeparator2 || next == paragraphSeparator2 else { continue } + guard let fin = try await nextFromSource() else { + return yield() + } + buffer.append(fin) + guard fin == lineSeparator3 || fin == paragraphSeparator3 else { + continue + } + return yield() + case lineFeed ..< carriageReturn, newLine1, lineSeparator1, paragraphSeparator1: + return yield() + default: + continue } - // There is no new line in the buffer. Load more buffer and try again - self.startIndex = currentIndex + 1 } + + // Don't emit an empty newline when there is no more content (e.g. end of file) + if !buffer.isEmpty { + return yield() + } + return nil } } diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 582af77..0476577 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -760,15 +760,8 @@ extension SubprocessUnixTests { } group.addTask { var outputs: [String] = [] - for try await bit in standardOutput { - let bitString = bit.withUnsafeBytes { ptr in - return String(decoding: ptr, as: UTF8.self) - }.trimmingCharacters(in: .whitespacesAndNewlines) - if bitString.contains("\n") { - outputs.append(contentsOf: bitString.split(separator: "\n").map { String($0) }) - } else { - outputs.append(bitString) - } + for try await line in standardOutput.lines() { + outputs.append(line.trimmingCharacters(in: .newlines)) } #expect(outputs == ["saw SIGQUIT", "saw SIGTERM", "saw SIGINT"]) } @@ -881,17 +874,21 @@ extension SubprocessUnixTests { let length: Int switch size { case .large: - length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize)) + length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize)) + 1 case .medium: - length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize)) + length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize)) + 1 case .small: - length = Int.random(in: 0 ..< 16) + length = Int.random(in: 1 ..< 16) } var buffer: [UInt8] = Array(repeating: 0, count: length) for index in 0 ..< length { buffer[index] = UInt8.random(in: range) } + // Buffer cannot be empty or a line with a \r ending followed by an empty one with a \n ending would be indistinguishable. + // This matters for any line ending sequences where one line ending sequence is the prefix of another. \r and \r\n are the + // only two which meet this criteria. + precondition(!buffer.isEmpty) return buffer } @@ -954,6 +951,8 @@ extension SubprocessUnixTests { ) { execution, standardOutput in var index = 0 for try await line in standardOutput.lines(encoding: UTF8.self) { + defer { index += 1 } + try #require(index < testCases.count, "Received more lines than expected") #expect( line == testCases[index].value, """ @@ -963,7 +962,6 @@ extension SubprocessUnixTests { Line Ending \(Array(testCases[index].newLine.utf8)) """ ) - index += 1 } } try FileManager.default.removeItem(at: testFilePath)