Skip to content

Commit 6162ffe

Browse files
committed
Track IOChannels that are not guaranteed to be async capable, and fall back to synchronous reads for Windows
1 parent 56bd69c commit 6162ffe

File tree

4 files changed

+88
-12
lines changed

4 files changed

+88
-12
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
4242

4343
private let diskIO: DiskIO
4444
private let preferredBufferSize: Int
45+
private let isAsyncIO: Bool
4546
private var buffer: [Buffer]
4647

47-
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
48+
internal init(diskIO: DiskIO, preferredBufferSize: Int?, isAsyncIO: Bool) {
4849
self.diskIO = diskIO
4950
self.buffer = []
51+
self.isAsyncIO = isAsyncIO
5052
self.preferredBufferSize = preferredBufferSize ?? readBufferSize
5153
}
5254

@@ -60,7 +62,8 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
6062
// Read more data
6163
let data = try await AsyncIO.shared.read(
6264
from: self.diskIO,
63-
upTo: self.preferredBufferSize
65+
upTo: self.preferredBufferSize,
66+
isAsyncIO: self.isAsyncIO
6467
)
6568
guard let data else {
6669
// We finished reading. Close the file descriptor now
@@ -87,17 +90,20 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
8790

8891
private let diskIO: DiskIO
8992
private let preferredBufferSize: Int?
93+
private let isAsyncIO: Bool
9094

91-
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
95+
internal init(diskIO: DiskIO, preferredBufferSize: Int?, isAsyncIO: Bool = true) {
9296
self.diskIO = diskIO
9397
self.preferredBufferSize = preferredBufferSize
98+
self.isAsyncIO = isAsyncIO
9499
}
95100

96101
/// Creates a iterator for this asynchronous sequence.
97102
public func makeAsyncIterator() -> Iterator {
98103
return Iterator(
99104
diskIO: self.diskIO,
100-
preferredBufferSize: self.preferredBufferSize
105+
preferredBufferSize: self.preferredBufferSize,
106+
isAsyncIO: self.isAsyncIO,
101107
)
102108
}
103109

Sources/Subprocess/IO/AsyncIO+Dispatch.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,20 @@ final class AsyncIO: Sendable {
3131

3232
internal func read(
3333
from diskIO: borrowing IOChannel,
34-
upTo maxLength: Int
34+
upTo maxLength: Int,
35+
isAsyncIO: Bool = true,
3536
) async throws -> DispatchData? {
3637
return try await self.read(
3738
from: diskIO.channel,
3839
upTo: maxLength,
40+
isAsyncIO: isAsyncIO,
3941
)
4042
}
4143

4244
internal func read(
4345
from dispatchIO: DispatchIO,
44-
upTo maxLength: Int
46+
upTo maxLength: Int,
47+
isAsyncIO: Bool,
4548
) async throws -> DispatchData? {
4649
return try await withCheckedThrowingContinuation { continuation in
4750
var buffer: DispatchData = .empty

Sources/Subprocess/IO/AsyncIO+Linux.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -367,14 +367,16 @@ extension AsyncIO {
367367

368368
func read(
369369
from diskIO: borrowing IOChannel,
370-
upTo maxLength: Int
370+
upTo maxLength: Int,
371+
isAsyncIO: Bool = true,
371372
) async throws -> [UInt8]? {
372-
return try await self.read(from: diskIO.channel, upTo: maxLength)
373+
return try await self.read(from: diskIO.channel, upTo: maxLength, isAsyncIO: isAsyncIO)
373374
}
374375

375376
func read(
376377
from fileDescriptor: FileDescriptor,
377-
upTo maxLength: Int
378+
upTo maxLength: Int,
379+
isAsyncIO: Bool,
378380
) async throws -> [UInt8]? {
379381
guard maxLength > 0 else {
380382
return nil

Sources/Subprocess/IO/AsyncIO+Windows.swift

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,16 @@ final class AsyncIO: @unchecked Sendable {
245245

246246
func read(
247247
from diskIO: borrowing IOChannel,
248-
upTo maxLength: Int
248+
upTo maxLength: Int,
249+
isAsyncIO: Bool = true,
249250
) async throws -> [UInt8]? {
250-
return try await self.read(from: diskIO.channel, upTo: maxLength)
251+
return try await self.read(from: diskIO.channel, upTo: maxLength, isAsyncIO: isAsyncIO)
251252
}
252253

253254
func read(
254255
from handle: HANDLE,
255-
upTo maxLength: Int
256+
upTo maxLength: Int,
257+
isAsyncIO: Bool,
256258
) async throws -> [UInt8]? {
257259
guard maxLength > 0 else {
258260
return nil
@@ -264,7 +266,70 @@ final class AsyncIO: @unchecked Sendable {
264266
var resultBuffer: [UInt8] = Array(
265267
repeating: 0, count: bufferLength
266268
)
269+
267270
var readLength: Int = 0
271+
272+
// We can't be certain that the HANDLE has overlapping I/O enabled on it, so
273+
// here we fall back to synchronous reads.
274+
guard isAsyncIO else {
275+
while true {
276+
let (succeed, bytesRead) = try resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in
277+
// Get a pointer to the memory at the specified offset
278+
// Windows ReadFile uses DWORD for target count, which means we can only
279+
// read up to DWORD (aka UInt32) max.
280+
let targetCount: DWORD = self.calculateRemainingCount(
281+
totalCount: bufferPointer.count,
282+
readCount: readLength
283+
)
284+
285+
var bytesRead = UInt32(0)
286+
let offsetAddress = bufferPointer.baseAddress!.advanced(by: readLength)
287+
// Read directly into the buffer at the offset
288+
return (
289+
ReadFile(
290+
handle,
291+
offsetAddress,
292+
targetCount,
293+
&bytesRead,
294+
nil
295+
), bytesRead
296+
)
297+
}
298+
299+
guard succeed else {
300+
let error = SubprocessError(
301+
code: .init(.failedToReadFromSubprocess),
302+
underlyingError: .init(rawValue: GetLastError())
303+
)
304+
throw error
305+
}
306+
307+
if bytesRead == 0 {
308+
// We reached EOF. Return whatever's left
309+
guard readLength > 0 else {
310+
return nil
311+
}
312+
resultBuffer.removeLast(resultBuffer.count - readLength)
313+
return resultBuffer
314+
} else {
315+
// Read some data
316+
readLength += Int(truncatingIfNeeded: bytesRead)
317+
if maxLength == .max {
318+
// Grow resultBuffer if needed
319+
guard Double(readLength) > 0.8 * Double(resultBuffer.count) else {
320+
continue
321+
}
322+
resultBuffer.append(
323+
contentsOf: Array(repeating: 0, count: resultBuffer.count)
324+
)
325+
} else if readLength >= maxLength {
326+
// When we reached maxLength, return!
327+
return resultBuffer
328+
}
329+
}
330+
}
331+
}
332+
268333
var signalStream = self.registerHandle(handle).makeAsyncIterator()
269334

270335
while true {

0 commit comments

Comments
 (0)