Skip to content

Commit c3e85bb

Browse files
committed
Implement LineSequence
1 parent a774271 commit c3e85bb

File tree

2 files changed

+354
-0
lines changed

2 files changed

+354
-0
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,242 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
7474
public func makeAsyncIterator() -> Iterator {
7575
return Iterator(diskIO: self.diskIO)
7676
}
77+
78+
public func lines<Encoding: _UnicodeEncoding>(
79+
encoding: Encoding.Type = UTF8.self,
80+
bufferingPolicy: LineSequence<Encoding>.BufferingPolicy = .unbounded
81+
) -> LineSequence<Encoding> {
82+
return LineSequence(underlying: self, encoding: encoding, bufferingPolicy: bufferingPolicy)
83+
}
84+
}
85+
86+
// MARK: - LineSequence
87+
#if SubprocessSpan
88+
@available(SubprocessSpan, *)
89+
#endif
90+
extension AsyncBufferSequence {
91+
public struct LineSequence<Encoding: _UnicodeEncoding>: AsyncSequence, Sendable {
92+
public typealias Element = String
93+
94+
private let base: AsyncBufferSequence
95+
private let bufferingPolicy: BufferingPolicy
96+
97+
public struct AsyncIterator: AsyncIteratorProtocol {
98+
public typealias Element = String
99+
100+
private var source: AsyncBufferSequence.AsyncIterator
101+
private var buffer: [Encoding.CodeUnit]
102+
private var eofReached: Bool
103+
private var startIndex: Int
104+
private let bufferingPolicy: BufferingPolicy
105+
106+
internal init(
107+
underlyingIterator: AsyncBufferSequence.AsyncIterator,
108+
bufferingPolicy: BufferingPolicy
109+
) {
110+
self.source = underlyingIterator
111+
self.buffer = []
112+
self.eofReached = false
113+
self.startIndex = 0
114+
self.bufferingPolicy = bufferingPolicy
115+
}
116+
117+
public mutating func next() async throws -> String? {
118+
119+
func loadBuffer() async throws -> [Encoding.CodeUnit]? {
120+
guard !self.eofReached else {
121+
return nil
122+
}
123+
124+
guard let buffer = try await self.source.next() else {
125+
self.eofReached = true
126+
return nil
127+
}
128+
#if os(Windows)
129+
// Cast data to CodeUnit type
130+
let result = buffer.withUnsafeBytes { ptr in
131+
return Array(
132+
UnsafeBufferPointer<Encoding.CodeUnit>(
133+
start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!,
134+
count: ptr.count / MemoryLayout<Encoding.CodeUnit>.size
135+
)
136+
)
137+
}
138+
#else
139+
// Unfortunitely here we _have to_ copy the bytes out because
140+
// DisptachIO (rightfully) reuses buffer, which means `buffer.data`
141+
// has the same address on all iterations, therefore we can't directly
142+
// create the result array from buffer.data
143+
let temporary = UnsafeMutableBufferPointer<Encoding.CodeUnit>.allocate(
144+
capacity: buffer.data.count
145+
)
146+
defer { temporary.deallocate() }
147+
let actualBytesCopied = buffer.data.copyBytes(to: temporary)
148+
149+
// Calculate how many CodePoint elements we have
150+
let elementCount = actualBytesCopied / MemoryLayout<Encoding.CodeUnit>.stride
151+
152+
// Create array by copying from the buffer reinterpreted as CodePoint
153+
let result: Array<Encoding.CodeUnit> = Array(
154+
UnsafeBufferPointer(start: temporary.baseAddress, count: elementCount)
155+
)
156+
#endif
157+
return result.isEmpty ? nil : result
158+
}
159+
160+
func yield(at endIndex: Int) -> String? {
161+
defer {
162+
self.buffer.removeFirst(endIndex)
163+
self.startIndex = 0
164+
}
165+
if self.buffer.isEmpty {
166+
return nil
167+
}
168+
return String(decoding: self.buffer[0 ..< endIndex], as: Encoding.self)
169+
}
170+
171+
// https://en.wikipedia.org/wiki/Newline#Unicode
172+
let lineFeed = Encoding.CodeUnit(0x0A)
173+
/// let verticalTab = Encoding.CodeUnit(0x0B)
174+
/// let formFeed = Encoding.CodeUnit(0x0C)
175+
let carriageReturn = Encoding.CodeUnit(0x0D)
176+
// carriageReturn + lineFeed
177+
let newLine: Encoding.CodeUnit
178+
let lineSeparator: Encoding.CodeUnit
179+
let paragraphSeparator: Encoding.CodeUnit
180+
switch Encoding.CodeUnit.self {
181+
case is UInt8.Type:
182+
newLine = Encoding.CodeUnit(0xC2) // 0xC2 0x85
183+
lineSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA8
184+
paragraphSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA9
185+
case is UInt16.Type, is UInt32.Type:
186+
newLine = Encoding.CodeUnit(0x0085)
187+
lineSeparator = Encoding.CodeUnit(0x2028)
188+
paragraphSeparator = Encoding.CodeUnit(0x2029)
189+
default:
190+
fatalError("Unknown encoding type \(Encoding.self)")
191+
}
192+
193+
while true {
194+
// Step 1: Load more buffer if needed
195+
if self.startIndex >= self.buffer.count {
196+
guard let nextBuffer = try await loadBuffer() else {
197+
// We have no more data
198+
// Return the remaining data
199+
return yield(at: self.buffer.count)
200+
}
201+
self.buffer += nextBuffer
202+
}
203+
// Step 2: Iterate through buffer to find next line
204+
var currentIndex: Int = self.startIndex
205+
for index in self.startIndex ..< self.buffer.count {
206+
currentIndex = index
207+
// Early return if we exceed max line length
208+
if case .maxLineLength(let maxLength) = self.bufferingPolicy,
209+
currentIndex >= maxLength {
210+
return yield(at: currentIndex)
211+
}
212+
let byte = self.buffer[currentIndex]
213+
switch byte {
214+
case carriageReturn:
215+
// Swallow any subsequent lineFeed if there is one
216+
var targetIndex = currentIndex
217+
if (currentIndex + 1) < self.buffer.count, self.buffer[currentIndex + 1] == lineFeed {
218+
targetIndex = currentIndex + 1
219+
}
220+
guard let result = yield(at: targetIndex + 1) else {
221+
continue
222+
}
223+
return result
224+
case lineFeed ..< carriageReturn:
225+
guard let result = yield(at: currentIndex + 1) else {
226+
continue
227+
}
228+
return result
229+
case newLine:
230+
var targetIndex = currentIndex
231+
if Encoding.CodeUnit.self is UInt8.Type {
232+
// For UTF8, look for the next 0x85 byte
233+
guard (targetIndex + 1) < self.buffer.count,
234+
self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x85) else {
235+
// Not a valid new ine. Keep looking
236+
continue
237+
}
238+
// Swallow 0x85 byte
239+
targetIndex += 1
240+
}
241+
guard let result = yield(at: targetIndex + 1) else {
242+
continue
243+
}
244+
return result
245+
case lineSeparator, paragraphSeparator:
246+
var targetIndex = currentIndex
247+
if Encoding.CodeUnit.self is UInt8.Type {
248+
// For UTF8, look for the next 0x80 byte
249+
guard (targetIndex + 1) < self.buffer.count,
250+
self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x80) else {
251+
// Not a valid new ine. Keep looking
252+
continue
253+
}
254+
// Swallow 0x80 byte
255+
targetIndex += 1
256+
// Look for the final 0xA8 (lineSeparator) or 0xA9 (paragraphSeparator)
257+
guard (targetIndex + 1) < self.buffer.count,
258+
(self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA8) ||
259+
self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA9)) else {
260+
// Not a valid new ine. Keep looking
261+
continue
262+
}
263+
// Swallow 0xA8 (or 0xA9) byte
264+
targetIndex += 1
265+
}
266+
guard let result = yield(at: targetIndex + 1) else {
267+
continue
268+
}
269+
return result
270+
default:
271+
// Keep searching
272+
continue
273+
}
274+
}
275+
// There is no new line in the buffer. Load more buffer and try again
276+
self.startIndex = currentIndex + 1
277+
}
278+
}
279+
}
280+
281+
public func makeAsyncIterator() -> AsyncIterator {
282+
return AsyncIterator(
283+
underlyingIterator: self.base.makeAsyncIterator(),
284+
bufferingPolicy: self.bufferingPolicy
285+
)
286+
}
287+
288+
internal init(
289+
underlying: AsyncBufferSequence,
290+
encoding: Encoding.Type,
291+
bufferingPolicy: BufferingPolicy
292+
) {
293+
self.base = underlying
294+
self.bufferingPolicy = bufferingPolicy
295+
}
296+
}
297+
}
298+
299+
#if SubprocessSpan
300+
@available(SubprocessSpan, *)
301+
#endif
302+
extension AsyncBufferSequence.LineSequence {
303+
public enum BufferingPolicy: Sendable {
304+
/// Continue to add to the buffer, without imposing a limit
305+
/// on the number of buffered elements (line length).
306+
case unbounded
307+
/// Impose a max buffer size (line length) limit.
308+
/// When using this policy, `LineSequence` will return a line if:
309+
/// - A newline character is encountered (standard behavior)
310+
/// - The current line in the buffer reaches or exceeds the specified maximum length
311+
case maxLineLength(Int)
312+
}
77313
}
78314

79315
// MARK: - Page Size

Tests/SubprocessTests/SubprocessTests+Unix.swift

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -933,6 +933,124 @@ extension SubprocessUnixTests {
933933
}
934934
#expect(result == .unhandledException(SIGKILL))
935935
}
936+
937+
@Test func testLineSequence() async throws {
938+
guard #available(SubprocessSpan , *) else {
939+
return
940+
}
941+
942+
typealias TestCase = (value: String, count: Int, newLine: String)
943+
enum TestCaseSize: CaseIterable {
944+
case large // (1.0 ~ 2.0) * buffer size
945+
case medium // (0.2 ~ 1.0) * buffer size
946+
case small // Less than 16 characters
947+
}
948+
949+
let newLineCharacters: [[UInt8]] = [
950+
[0x0A], // Line feed
951+
[0x0B], // Vertical tab
952+
[0x0C], // Form feed
953+
[0x0D], // Carriage return
954+
[0x0D, 0x0A], // Carriage return + Line feed
955+
[0xC2, 0x85], // New line
956+
[0xE2, 0x80, 0xA8], // Line Separator
957+
[0xE2, 0x80, 0xA9] // Paragraph separator
958+
]
959+
960+
// Geneate test cases
961+
func generateString(size: TestCaseSize) -> [UInt8] {
962+
// Basic Latin has the range U+0020 ... U+007E
963+
let range: ClosedRange<UInt8> = 0x20 ... 0x7E // 0x4E ... 0x5A
964+
965+
let length: Int
966+
switch size {
967+
case .large:
968+
length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize))
969+
case .medium:
970+
length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize))
971+
case .small:
972+
length = Int.random(in: 0 ..< 16)
973+
}
974+
975+
var buffer: [UInt8] = Array(repeating: 0, count: length)
976+
for index in 0 ..< length {
977+
buffer[index] = UInt8.random(in: range)
978+
}
979+
return buffer
980+
}
981+
982+
// Generate at least 2 long lines that is longer than buffer size
983+
func generateTestCases(count: Int) -> [TestCase] {
984+
var targetSizes: [TestCaseSize] = TestCaseSize.allCases.flatMap {
985+
Array(repeating: $0, count: count / 3)
986+
}
987+
// Fill the remainder
988+
let remaining = count - targetSizes.count
989+
let rest = TestCaseSize.allCases.shuffled().prefix(remaining)
990+
targetSizes.append(contentsOf: rest)
991+
// Do a final shuffle to achiave random order
992+
targetSizes.shuffle()
993+
// Now generate test cases based on sizes
994+
var testCases: [TestCase] = []
995+
for size in targetSizes {
996+
let components = generateString(size: size)
997+
// Choose a random new line
998+
let newLine = newLineCharacters.randomElement()!
999+
let string = String(decoding: components + newLine, as: UTF8.self)
1000+
testCases.append((
1001+
value: string,
1002+
count: components.count + newLine.count,
1003+
newLine: String(decoding: newLine, as: UTF8.self)
1004+
))
1005+
}
1006+
return testCases
1007+
}
1008+
1009+
func writeTestCasesToFile(_ testCases: [TestCase], at url: URL) throws {
1010+
#if canImport(Darwin)
1011+
FileManager.default.createFile(atPath: url.path(), contents: nil, attributes: nil)
1012+
let fileHadle = try FileHandle(forWritingTo: url)
1013+
for testCase in testCases {
1014+
fileHadle.write(testCase.value.data(using: .utf8)!)
1015+
}
1016+
try fileHadle.close()
1017+
#else
1018+
var result = ""
1019+
for testCase in testCases {
1020+
result += testCase.value
1021+
}
1022+
try result.write(to: url, atomically: true, encoding: .utf8)
1023+
#endif
1024+
}
1025+
1026+
let testCaseCount = 60
1027+
let testFilePath = URL.temporaryDirectory.appending(path: "NewLines.txt")
1028+
if FileManager.default.fileExists(atPath: testFilePath.path()) {
1029+
try FileManager.default.removeItem(at: testFilePath)
1030+
}
1031+
let testCases = generateTestCases(count: testCaseCount)
1032+
try writeTestCasesToFile(testCases, at: testFilePath)
1033+
1034+
_ = try await Subprocess.run(
1035+
.path("/bin/cat"),
1036+
arguments: [testFilePath.path()],
1037+
error: .discarded
1038+
) { execution, standardOutput in
1039+
var index = 0
1040+
for try await line in standardOutput.lines(encoding: UTF8.self) {
1041+
#expect(
1042+
line == testCases[index].value,
1043+
"""
1044+
Found mistachig line at index \(index)
1045+
Expected: [\(testCases[index].value)]
1046+
Actual: [\(line)]
1047+
Line Ending \(Array(testCases[index].newLine.utf8))
1048+
"""
1049+
)
1050+
index += 1
1051+
}
1052+
}
1053+
}
9361054
}
9371055

9381056
// MARK: - Utils

0 commit comments

Comments
 (0)