@@ -29,19 +29,17 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
29
29
public typealias Element = SequenceOutput . Buffer
30
30
31
31
private let diskIO : TrackedPlatformDiskIO
32
- private let bufferSize : Int
33
32
private var buffer : [ UInt8 ]
34
33
private var currentPosition : Int
35
34
private var finished : Bool
36
- private var streamIterator : AsyncThrowingStream < StreamStatus , Swift . Error > . AsyncIterator
35
+ private var streamIterator : AsyncThrowingStream < TrackedPlatformDiskIO . StreamStatus , Swift . Error > . AsyncIterator
37
36
38
- internal init ( diskIO: TrackedPlatformDiskIO , bufferSize : Int ) {
37
+ internal init ( diskIO: TrackedPlatformDiskIO ) {
39
38
self . diskIO = diskIO
40
- self . bufferSize = bufferSize
41
39
self . buffer = [ ]
42
40
self . currentPosition = 0
43
41
self . finished = false
44
- self . streamIterator = Self . createDataStream ( with : diskIO . dispatchIO , bufferSize : bufferSize ) . makeAsyncIterator ( )
42
+ self . streamIterator = diskIO . readDataStream ( upToLength : readBufferSize ) . makeAsyncIterator ( )
45
43
}
46
44
47
45
public mutating func next( ) async throws -> SequenceOutput . Buffer ? {
@@ -51,7 +49,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
51
49
return data
52
50
53
51
case . endOfStream( let data) :
54
- streamIterator = Self . createDataStream ( with : diskIO . dispatchIO , bufferSize : bufferSize ) . makeAsyncIterator ( )
52
+ streamIterator = diskIO . readDataStream ( upToLength : readBufferSize ) . makeAsyncIterator ( )
55
53
return data
56
54
57
55
case . endOfFile:
@@ -63,66 +61,24 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
63
61
return nil
64
62
}
65
63
}
66
-
67
- private enum StreamStatus {
68
- case data( SequenceOutput . Buffer )
69
- case endOfStream( SequenceOutput . Buffer )
70
- case endOfFile
71
- }
72
-
73
- private static func createDataStream( with dispatchIO: DispatchIO , bufferSize: Int ) -> AsyncThrowingStream < StreamStatus , Swift . Error > {
74
- return AsyncThrowingStream < StreamStatus , Swift . Error > { continuation in
75
- dispatchIO. read (
76
- offset: 0 ,
77
- length: bufferSize,
78
- queue: . global( )
79
- ) { done, data, error in
80
- if error != 0 {
81
- continuation. finish ( throwing: SubprocessError (
82
- code: . init( . failedToReadFromSubprocess) ,
83
- underlyingError: . init( rawValue: error)
84
- ) )
85
- return
86
- }
87
-
88
- // Treat empty data and nil as the same
89
- let buffer = data. map { $0. isEmpty ? nil : $0 } ?? nil
90
- let status : StreamStatus
91
-
92
- switch ( buffer, done) {
93
- case ( . some( let data) , false ) :
94
- status = . data( SequenceOutput . Buffer ( data: data) )
95
-
96
- case ( . some( let data) , true ) :
97
- status = . endOfStream( SequenceOutput . Buffer ( data: data) )
98
-
99
- case ( nil , false ) :
100
- return
101
-
102
- case ( nil , true ) :
103
- status = . endOfFile
104
- }
105
-
106
- continuation. yield ( status)
107
-
108
- if done {
109
- continuation. finish ( )
110
- }
111
- }
112
- }
113
- }
114
64
}
115
65
116
66
private let diskIO : TrackedPlatformDiskIO
117
- private let bufferSize : Int
118
67
119
- internal init ( diskIO: TrackedPlatformDiskIO , bufferSize : Int ) {
68
+ internal init ( diskIO: TrackedPlatformDiskIO ) {
120
69
self . diskIO = diskIO
121
- self . bufferSize = bufferSize
122
70
}
123
71
124
72
public func makeAsyncIterator( ) -> Iterator {
125
- return Iterator ( diskIO: self . diskIO, bufferSize: bufferSize)
73
+ return Iterator ( diskIO: self . diskIO)
74
+ }
75
+ }
76
+
77
+ extension TrackedPlatformDiskIO {
78
+ internal enum StreamStatus {
79
+ case data( SequenceOutput . Buffer )
80
+ case endOfStream( SequenceOutput . Buffer )
81
+ case endOfFile
126
82
}
127
83
}
128
84
0 commit comments