Skip to content

Fix issues with AsyncBufferSequence.LineSequence #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 83 additions & 89 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ extension AsyncBufferSequence {

private var source: AsyncBufferSequence.AsyncIterator
private var buffer: [Encoding.CodeUnit]
private var underlyingBuffer: [Encoding.CodeUnit]
private var leftover: Encoding.CodeUnit?
private var eofReached: Bool
private var startIndex: Int
private let bufferingPolicy: BufferingPolicy

internal init(
Expand All @@ -114,8 +115,9 @@ extension AsyncBufferSequence {
) {
self.source = underlyingIterator
self.buffer = []
self.underlyingBuffer = []
self.leftover = nil
self.eofReached = false
self.startIndex = 0
self.bufferingPolicy = bufferingPolicy
}

Expand Down Expand Up @@ -160,15 +162,32 @@ extension AsyncBufferSequence {
return result.isEmpty ? nil : result
}

func yield(at endIndex: Int) -> String? {
func yield() -> String? {
defer {
self.buffer.removeFirst(endIndex)
self.startIndex = 0
self.buffer.removeAll(keepingCapacity: true)
}
if self.buffer.isEmpty {
return nil
}
return String(decoding: self.buffer[0 ..< endIndex], as: Encoding.self)
return String(decoding: self.buffer, as: Encoding.self)
}

func nextFromSource() async throws -> Encoding.CodeUnit? {
if underlyingBuffer.isEmpty {
guard let buf = try await loadBuffer() else {
return nil
}
underlyingBuffer = buf
}
return underlyingBuffer.removeFirst()
}

func nextCodeUnit() async throws -> Encoding.CodeUnit? {
defer { leftover = nil }
if let leftover = leftover {
return leftover
}
return try await nextFromSource()
}

// https://en.wikipedia.org/wiki/Newline#Unicode
Expand Down Expand Up @@ -213,95 +232,70 @@ extension AsyncBufferSequence {
fatalError("Unknown encoding type \(Encoding.self)")
}

while true {
// Step 1: Load more buffer if needed
if self.startIndex >= self.buffer.count {
guard let nextBuffer = try await loadBuffer() else {
// We have no more data
// Return the remaining data
return yield(at: self.buffer.count)
}
self.buffer += nextBuffer
while let first = try await nextCodeUnit() {
// Throw if we exceed max line length
if case .maxLineLength(let maxLength) = self.bufferingPolicy, buffer.count >= maxLength {
throw SubprocessError(
code: .init(.streamOutputExceedsLimit(maxLength)),
underlyingError: nil
)
}
// Step 2: Iterate through buffer to find next line
var currentIndex: Int = self.startIndex
for index in self.startIndex ..< self.buffer.count {
currentIndex = index
// Throw if we exceed max line length
if case .maxLineLength(let maxLength) = self.bufferingPolicy,
currentIndex >= maxLength {
throw SubprocessError(
code: .init(.streamOutputExceedsLimit(maxLength)),
underlyingError: nil
)

buffer.append(first)
switch first {
case carriageReturn:
// Swallow up any subsequent LF
guard let next = try await nextFromSource() else {
return yield() // if we ran out of bytes, the last byte was a CR
}
let byte = self.buffer[currentIndex]
switch byte {
case carriageReturn:
// Swallow any subsequent lineFeed if there is one
var targetIndex = currentIndex
if (currentIndex + 1) < self.buffer.count, self.buffer[currentIndex + 1] == lineFeed {
targetIndex = currentIndex + 1
}
guard let result = yield(at: targetIndex + 1) else {
continue
}
return result
case lineFeed ..< carriageReturn:
guard let result = yield(at: currentIndex + 1) else {
continue
}
return result
case newLine1:
var targetIndex = currentIndex
if Encoding.CodeUnit.self is UInt8.Type {
// For UTF8, look for the next 0x85 byte
guard (targetIndex + 1) < self.buffer.count,
self.buffer[targetIndex + 1] == newLine2 else {
// Not a valid new line. Keep looking
continue
}
// Swallow 0x85 byte
targetIndex += 1
}
guard let result = yield(at: targetIndex + 1) else {
continue
}
return result
case lineSeparator1, paragraphSeparator1:
var targetIndex = currentIndex
if Encoding.CodeUnit.self is UInt8.Type {
// For UTF8, look for the next byte
guard (targetIndex + 1) < self.buffer.count,
self.buffer[targetIndex + 1] == lineSeparator2 ||
self.buffer[targetIndex + 1] == paragraphSeparator2 else {
// Not a valid new line. Keep looking
continue
}
// Swallow next byte
targetIndex += 1
// Look for the final byte
guard (targetIndex + 1) < self.buffer.count,
(self.buffer[targetIndex + 1] == lineSeparator3 ||
self.buffer[targetIndex + 1] == paragraphSeparator3) else {
// Not a valid new line. Keep looking
continue
}
// Swallow 0xA8 (or 0xA9) byte
targetIndex += 1
}
guard let result = yield(at: targetIndex + 1) else {
continue
}
return result
default:
// Keep searching
buffer.append(next)
guard next == lineFeed else {
// if the next character was not an LF, save it for the next iteration and still return a line
leftover = buffer.removeLast()
return yield()
}
return yield()
case newLine1 where Encoding.CodeUnit.self is UInt8.Type: // this may be used to compose other UTF8 characters
guard let next = try await nextFromSource() else {
// technically invalid UTF8 but it should be repaired to "\u{FFFD}"
return yield()
}
buffer.append(next)
guard next == newLine2 else {
continue
}
return yield()
case lineSeparator1 where Encoding.CodeUnit.self is UInt8.Type,
paragraphSeparator1 where Encoding.CodeUnit.self is UInt8.Type:
// Try to read: 80 [A8 | A9].
// If we can't, then we put the byte in the buffer for error correction
guard let next = try await nextFromSource() else {
return yield()
}
buffer.append(next)
guard next == lineSeparator2 || next == paragraphSeparator2 else {
continue
}
guard let fin = try await nextFromSource() else {
return yield()
}
buffer.append(fin)
guard fin == lineSeparator3 || fin == paragraphSeparator3 else {
continue
}
return yield()
case lineFeed ..< carriageReturn, newLine1, lineSeparator1, paragraphSeparator1:
return yield()
default:
continue
}
// There is no new line in the buffer. Load more buffer and try again
self.startIndex = currentIndex + 1
}

// Don't emit an empty newline when there is no more content (e.g. end of file)
if !buffer.isEmpty {
return yield()
}
return nil
}
}

Expand Down
24 changes: 11 additions & 13 deletions Tests/SubprocessTests/SubprocessTests+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -760,15 +760,8 @@ extension SubprocessUnixTests {
}
group.addTask {
var outputs: [String] = []
for try await bit in standardOutput {
let bitString = bit.withUnsafeBytes { ptr in
return String(decoding: ptr, as: UTF8.self)
}.trimmingCharacters(in: .whitespacesAndNewlines)
if bitString.contains("\n") {
outputs.append(contentsOf: bitString.split(separator: "\n").map { String($0) })
} else {
outputs.append(bitString)
}
for try await line in standardOutput.lines() {
outputs.append(line.trimmingCharacters(in: .newlines))
}
#expect(outputs == ["saw SIGQUIT", "saw SIGTERM", "saw SIGINT"])
}
Expand Down Expand Up @@ -881,17 +874,21 @@ extension SubprocessUnixTests {
let length: Int
switch size {
case .large:
length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize))
length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize)) + 1
case .medium:
length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize))
length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize)) + 1
case .small:
length = Int.random(in: 0 ..< 16)
length = Int.random(in: 1 ..< 16)
}

var buffer: [UInt8] = Array(repeating: 0, count: length)
for index in 0 ..< length {
buffer[index] = UInt8.random(in: range)
}
// Buffer cannot be empty or a line with a \r ending followed by an empty one with a \n ending would be indistinguishable.
// This matters for any line ending sequences where one line ending sequence is the prefix of another. \r and \r\n are the
// only two which meet this criteria.
precondition(!buffer.isEmpty)
return buffer
}

Expand Down Expand Up @@ -954,6 +951,8 @@ extension SubprocessUnixTests {
) { execution, standardOutput in
var index = 0
for try await line in standardOutput.lines(encoding: UTF8.self) {
defer { index += 1 }
try #require(index < testCases.count, "Received more lines than expected")
#expect(
line == testCases[index].value,
"""
Expand All @@ -963,7 +962,6 @@ extension SubprocessUnixTests {
Line Ending \(Array(testCases[index].newLine.utf8))
"""
)
index += 1
}
}
try FileManager.default.removeItem(at: testFilePath)
Expand Down
Loading