Skip to content

Commit 05c808d

Browse files
authored
Introduce AsyncIO for Windows and Linux (#117)
* Create platform specific AsyncIO - Darwin: based on DispatchIO - Linux: based on epoll - Windows (not included in this commit): based on IOCP with OVERLAPPED * Refactor captureOutput() to minimize force unwrap * Create platform specific AsyncIO - Darwin: based on DispatchIO - Linux: based on epoll - Windows (not included in this commit): based on IOCP with OVERLAPPED * Fix fd was not closed error on Windows * Fix Windows test errors * Introduce Windows IOCP based AsyncIO implementation * Move platform specific AsyncIO implementations to separate files * Use _beginthreadex instead of CreatThread on Windows for AsyncIO * Remove DispatchData._ContiguousBufferView since Linux no longer relies on DispatchIO and we can just use DispatchData.Region on Darwin * Linux: reap child process if fork succeed but exec fails
1 parent 5715ed4 commit 05c808d

24 files changed

+1708
-661
lines changed

Package.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import PackageDescription
66
var dep: [Package.Dependency] = [
77
.package(
88
url: "https://github.com/apple/swift-system",
9-
from: "1.5.0"
9+
// Temporarily pin to 1.5.0 because 1.6.0 has a breaking change for Ubuntu Focal
10+
// https://github.com/apple/swift-system/issues/237
11+
exact: "1.5.0"
1012
)
1113
]
1214
#if !os(Windows)

Sources/Subprocess/API.swift

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ public func run<
105105
output: try output.createPipe(),
106106
error: try error.createPipe()
107107
) { execution, inputIO, outputIO, errorIO in
108-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
109-
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
110-
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
108+
var inputIOBox: IOChannel? = consume inputIO
109+
var outputIOBox: IOChannel? = consume outputIO
110+
var errorIOBox: IOChannel? = consume errorIO
111111

112112
// Write input, capture output and error in parallel
113113
async let stdout = try output.captureOutput(from: outputIOBox.take())
@@ -177,12 +177,12 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol, Error: Out
177177
output: try output.createPipe(),
178178
error: try error.createPipe()
179179
) { execution, inputIO, outputIO, errorIO in
180-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
180+
var inputIOBox: IOChannel? = consume inputIO
181181
return try await withThrowingTaskGroup(
182182
of: Void.self,
183183
returning: Result.self
184184
) { group in
185-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
185+
var inputIOContainer: IOChannel? = inputIOBox.take()
186186
group.addTask {
187187
if let inputIO = inputIOContainer.take() {
188188
let writer = StandardInputWriter(diskIO: inputIO)
@@ -237,13 +237,13 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
237237
output: try output.createPipe(),
238238
error: try error.createPipe()
239239
) { execution, inputIO, outputIO, errorIO in
240-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
241-
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
240+
var inputIOBox: IOChannel? = consume inputIO
241+
var outputIOBox: IOChannel? = consume outputIO
242242
return try await withThrowingTaskGroup(
243243
of: Void.self,
244244
returning: Result.self
245245
) { group in
246-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
246+
var inputIOContainer: IOChannel? = inputIOBox.take()
247247
group.addTask {
248248
if let inputIO = inputIOContainer.take() {
249249
let writer = StandardInputWriter(diskIO: inputIO)
@@ -253,7 +253,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
253253
}
254254

255255
// Body runs in the same isolation
256-
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO())
256+
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeIOChannel())
257257
let result = try await body(execution, outputSequence)
258258
try await group.waitForAll()
259259
return result
@@ -299,13 +299,13 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
299299
output: try output.createPipe(),
300300
error: try error.createPipe()
301301
) { execution, inputIO, outputIO, errorIO in
302-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
303-
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
302+
var inputIOBox: IOChannel? = consume inputIO
303+
var errorIOBox: IOChannel? = consume errorIO
304304
return try await withThrowingTaskGroup(
305305
of: Void.self,
306306
returning: Result.self
307307
) { group in
308-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
308+
var inputIOContainer: IOChannel? = inputIOBox.take()
309309
group.addTask {
310310
if let inputIO = inputIOContainer.take() {
311311
let writer = StandardInputWriter(diskIO: inputIO)
@@ -315,7 +315,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
315315
}
316316

317317
// Body runs in the same isolation
318-
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO())
318+
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeIOChannel())
319319
let result = try await body(execution, errorSequence)
320320
try await group.waitForAll()
321321
return result
@@ -363,7 +363,7 @@ public func run<Result, Error: OutputProtocol>(
363363
error: try error.createPipe()
364364
) { execution, inputIO, outputIO, errorIO in
365365
let writer = StandardInputWriter(diskIO: inputIO!)
366-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
366+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
367367
return try await body(execution, writer, outputSequence)
368368
}
369369
}
@@ -408,7 +408,7 @@ public func run<Result, Output: OutputProtocol>(
408408
error: try error.createPipe()
409409
) { execution, inputIO, outputIO, errorIO in
410410
let writer = StandardInputWriter(diskIO: inputIO!)
411-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
411+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
412412
return try await body(execution, writer, errorSequence)
413413
}
414414
}
@@ -460,8 +460,8 @@ public func run<Result>(
460460
error: try error.createPipe()
461461
) { execution, inputIO, outputIO, errorIO in
462462
let writer = StandardInputWriter(diskIO: inputIO!)
463-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
464-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
463+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
464+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
465465
return try await body(execution, writer, outputSequence, errorSequence)
466466
}
467467
}
@@ -497,16 +497,16 @@ public func run<
497497
error: try error.createPipe()
498498
) { (execution, inputIO, outputIO, errorIO) -> RunResult in
499499
// Write input, capture output and error in parallel
500-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
501-
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
502-
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
500+
var inputIOBox: IOChannel? = consume inputIO
501+
var outputIOBox: IOChannel? = consume outputIO
502+
var errorIOBox: IOChannel? = consume errorIO
503503
return try await withThrowingTaskGroup(
504504
of: OutputCapturingState<Output.OutputType, Error.OutputType>?.self,
505505
returning: RunResult.self
506506
) { group in
507-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
508-
var outputIOContainer: TrackedPlatformDiskIO? = outputIOBox.take()
509-
var errorIOContainer: TrackedPlatformDiskIO? = errorIOBox.take()
507+
var inputIOContainer: IOChannel? = inputIOBox.take()
508+
var outputIOContainer: IOChannel? = outputIOBox.take()
509+
var errorIOContainer: IOChannel? = errorIOBox.take()
510510
group.addTask {
511511
if let writeFd = inputIOContainer.take() {
512512
let writer = StandardInputWriter(diskIO: writeFd)
@@ -580,8 +580,8 @@ public func run<Result>(
580580
error: try error.createPipe()
581581
) { execution, inputIO, outputIO, errorIO in
582582
let writer = StandardInputWriter(diskIO: inputIO!)
583-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
584-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
583+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
584+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
585585
return try await body(execution, writer, outputSequence, errorSequence)
586586
}
587587
}

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919
internal import Dispatch
2020
#endif
2121

22-
public struct AsyncBufferSequence: AsyncSequence, Sendable {
22+
public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
2323
public typealias Failure = any Swift.Error
2424
public typealias Element = Buffer
2525

26-
#if os(Windows)
27-
internal typealias DiskIO = FileDescriptor
28-
#else
26+
#if canImport(Darwin)
2927
internal typealias DiskIO = DispatchIO
28+
#elseif canImport(WinSDK)
29+
internal typealias DiskIO = HANDLE
30+
#else
31+
internal typealias DiskIO = FileDescriptor
3032
#endif
3133

3234
@_nonSendable
@@ -47,15 +49,18 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
4749
return self.buffer.removeFirst()
4850
}
4951
// Read more data
50-
let data = try await self.diskIO.read(
51-
upToLength: readBufferSize
52+
let data = try await AsyncIO.shared.read(
53+
from: self.diskIO,
54+
upTo: readBufferSize
5255
)
5356
guard let data else {
5457
// We finished reading. Close the file descriptor now
55-
#if os(Windows)
56-
try self.diskIO.close()
58+
#if canImport(Darwin)
59+
try _safelyClose(.dispatchIO(self.diskIO))
60+
#elseif canImport(WinSDK)
61+
try _safelyClose(.handle(self.diskIO))
5762
#else
58-
self.diskIO.close()
63+
try _safelyClose(.fileDescriptor(self.diskIO))
5964
#endif
6065
return nil
6166
}
@@ -132,17 +137,7 @@ extension AsyncBufferSequence {
132137
self.eofReached = true
133138
return nil
134139
}
135-
#if os(Windows)
136-
// Cast data to CodeUnit type
137-
let result = buffer.withUnsafeBytes { ptr in
138-
return Array(
139-
UnsafeBufferPointer<Encoding.CodeUnit>(
140-
start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!,
141-
count: ptr.count / MemoryLayout<Encoding.CodeUnit>.size
142-
)
143-
)
144-
}
145-
#else
140+
#if canImport(Darwin)
146141
// Unfortunately here we _have to_ copy the bytes out because
147142
// DispatchIO (rightfully) reuses buffer, which means `buffer.data`
148143
// has the same address on all iterations, therefore we can't directly
@@ -157,7 +152,13 @@ extension AsyncBufferSequence {
157152
UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount)
158153
)
159154
}
160-
155+
#else
156+
// Cast data to CodeUnit type
157+
let result = buffer.withUnsafeBytes { ptr in
158+
return ptr.withMemoryRebound(to: Encoding.CodeUnit.self) { codeUnitPtr in
159+
return Array(codeUnitPtr)
160+
}
161+
}
161162
#endif
162163
return result.isEmpty ? nil : result
163164
}
@@ -340,7 +341,7 @@ private let _pageSize: Int = {
340341
Int(_subprocess_vm_size())
341342
}()
342343
#elseif canImport(WinSDK)
343-
import WinSDK
344+
@preconcurrency import WinSDK
344345
private let _pageSize: Int = {
345346
var sysInfo: SYSTEM_INFO = SYSTEM_INFO()
346347
GetSystemInfo(&sysInfo)

Sources/Subprocess/Buffer.swift

Lines changed: 30 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,35 @@
1717
extension AsyncBufferSequence {
1818
/// A immutable collection of bytes
1919
public struct Buffer: Sendable {
20-
#if os(Windows)
21-
internal let data: [UInt8]
22-
23-
internal init(data: [UInt8]) {
24-
self.data = data
25-
}
26-
27-
internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
28-
return [.init(data: data)]
29-
}
30-
#else
31-
// We need to keep the backingData alive while _ContiguousBufferView is alive
20+
#if canImport(Darwin)
21+
// We need to keep the backingData alive while Slice is alive
3222
internal let backingData: DispatchData
33-
internal let data: DispatchData._ContiguousBufferView
23+
internal let data: DispatchData.Region
3424

35-
internal init(data: DispatchData._ContiguousBufferView, backingData: DispatchData) {
25+
internal init(data: DispatchData.Region, backingData: DispatchData) {
3626
self.data = data
3727
self.backingData = backingData
3828
}
3929

4030
internal static func createFrom(_ data: DispatchData) -> [Buffer] {
41-
let slices = data.contiguousBufferViews
31+
let slices = data.regions
4232
// In most (all?) cases data should only have one slice
4333
if _fastPath(slices.count == 1) {
4434
return [.init(data: slices[0], backingData: data)]
4535
}
4636
return slices.map{ .init(data: $0, backingData: data) }
4737
}
48-
#endif
38+
#else
39+
internal let data: [UInt8]
40+
41+
internal init(data: [UInt8]) {
42+
self.data = data
43+
}
44+
45+
internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
46+
return [.init(data: data)]
47+
}
48+
#endif // canImport(Darwin)
4949
}
5050
}
5151

@@ -92,55 +92,33 @@ extension AsyncBufferSequence.Buffer {
9292

9393
// MARK: - Hashable, Equatable
9494
extension AsyncBufferSequence.Buffer: Equatable, Hashable {
95-
#if os(Windows)
96-
// Compiler generated conformances
97-
#else
95+
#if canImport(Darwin)
9896
public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool {
99-
return lhs.data.elementsEqual(rhs.data)
97+
return lhs.data == rhs.data
10098
}
10199

102100
public func hash(into hasher: inout Hasher) {
103-
self.data.withUnsafeBytes { ptr in
104-
hasher.combine(bytes: ptr)
105-
}
101+
return self.data.hash(into: &hasher)
106102
}
107103
#endif
104+
// else Compiler generated conformances
108105
}
109106

110-
// MARK: - DispatchData.Block
111-
#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl)
112-
extension DispatchData {
113-
/// Unfortunately `DispatchData.Region` is not available on Linux, hence our own wrapper
114-
internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection {
115-
typealias Element = UInt8
116-
117-
internal let bytes: UnsafeBufferPointer<UInt8>
118-
119-
internal var startIndex: Int { self.bytes.startIndex }
120-
internal var endIndex: Int { self.bytes.endIndex }
121-
122-
internal init(bytes: UnsafeBufferPointer<UInt8>) {
123-
self.bytes = bytes
124-
}
125-
126-
internal func withUnsafeBytes<ResultType>(_ body: (UnsafeRawBufferPointer) throws -> ResultType) rethrows -> ResultType {
127-
return try body(UnsafeRawBufferPointer(self.bytes))
128-
}
129-
130-
subscript(position: Int) -> UInt8 {
131-
_read {
132-
yield self.bytes[position]
107+
#if canImport(Darwin)
108+
extension DispatchData.Region {
109+
static func == (lhs: DispatchData.Region, rhs: DispatchData.Region) -> Bool {
110+
return lhs.withUnsafeBytes { lhsBytes in
111+
return rhs.withUnsafeBytes { rhsBytes in
112+
return lhsBytes.elementsEqual(rhsBytes)
133113
}
134114
}
135115
}
136116

137-
internal var contiguousBufferViews: [_ContiguousBufferView] {
138-
var slices = [_ContiguousBufferView]()
139-
enumerateBytes { (bytes, index, stop) in
140-
slices.append(_ContiguousBufferView(bytes: bytes))
117+
internal func hash(into hasher: inout Hasher) {
118+
return self.withUnsafeBytes { ptr in
119+
return hasher.combine(bytes: ptr)
141120
}
142-
return slices
143121
}
144122
}
145-
146123
#endif
124+

Sources/Subprocess/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ add_library(Subprocess
1717
Result.swift
1818
IO/Output.swift
1919
IO/Input.swift
20+
IO/AsyncIO+Darwin.swift
21+
IO/AsyncIO+Linux.swift
22+
IO/AsyncIO+Windows.swift
2023
Span+Subprocess.swift
2124
AsyncBufferSequence.swift
2225
API.swift

0 commit comments

Comments
 (0)