Skip to content

Introduce AsyncIO for Windows and Linux #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 29, 2025
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import PackageDescription
var dep: [Package.Dependency] = [
.package(
url: "https://github.com/apple/swift-system",
from: "1.5.0"
exact: "1.5.0"
)
]
#if !os(Windows)
Expand Down
55 changes: 14 additions & 41 deletions Sources/Subprocess/Buffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ extension AsyncBufferSequence {
#if canImport(Darwin)
// We need to keep the backingData alive while Slice is alive
internal let backingData: DispatchData
internal let data: DispatchData._ContiguousBufferView
internal let data: DispatchData.Region

internal init(data: DispatchData._ContiguousBufferView, backingData: DispatchData) {
internal init(data: DispatchData.Region, backingData: DispatchData) {
self.data = data
self.backingData = backingData
}

internal static func createFrom(_ data: DispatchData) -> [Buffer] {
let slices = data.contiguousBufferViews
let slices = data.regions
// In most (all?) cases data should only have one slice
if _fastPath(slices.count == 1) {
return [.init(data: slices[0], backingData: data)]
Expand Down Expand Up @@ -98,54 +98,27 @@ extension AsyncBufferSequence.Buffer: Equatable, Hashable {
}

public func hash(into hasher: inout Hasher) {
hasher.combine(self.data)
return self.data.hash(into: &hasher)
}
#endif
// else Compiler generated conformances
}

// MARK: - DispatchData.Block
#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl)
extension DispatchData {
/// Unfortunately `DispatchData.Region` is not available on Linux, hence our own wrapper
internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection, Hashable {
typealias Element = UInt8

internal let bytes: UnsafeBufferPointer<UInt8>

internal var startIndex: Int { self.bytes.startIndex }
internal var endIndex: Int { self.bytes.endIndex }

internal init(bytes: UnsafeBufferPointer<UInt8>) {
self.bytes = bytes
}

internal func withUnsafeBytes<ResultType>(_ body: (UnsafeRawBufferPointer) throws -> ResultType) rethrows -> ResultType {
return try body(UnsafeRawBufferPointer(self.bytes))
}

internal func hash(into hasher: inout Hasher) {
hasher.combine(bytes: UnsafeRawBufferPointer(self.bytes))
}

internal static func == (lhs: DispatchData._ContiguousBufferView, rhs: DispatchData._ContiguousBufferView) -> Bool {
return lhs.bytes.elementsEqual(rhs.bytes)
}

subscript(position: Int) -> UInt8 {
_read {
yield self.bytes[position]
#if canImport(Darwin)
extension DispatchData.Region {
static func == (lhs: DispatchData.Region, rhs: DispatchData.Region) -> Bool {
return lhs.withUnsafeBytes { lhsBytes in
return rhs.withUnsafeBytes { rhsBytes in
return lhsBytes.elementsEqual(rhsBytes)
}
}
}

internal var contiguousBufferViews: [_ContiguousBufferView] {
var slices = [_ContiguousBufferView]()
enumerateBytes { (bytes, index, stop) in
slices.append(_ContiguousBufferView(bytes: bytes))
internal func hash(into hasher: inout Hasher) {
return self.withUnsafeBytes { ptr in
return hasher.combine(bytes: ptr)
}
return slices
}
}

#endif

159 changes: 95 additions & 64 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import Musl

internal import Dispatch

import Synchronization

/// A collection of configurations parameters to use when
/// spawning a subprocess.
public struct Configuration: Sendable {
Expand Down Expand Up @@ -775,6 +777,16 @@ internal struct IOChannel: ~Copyable, @unchecked Sendable {
}
}

#if canImport(WinSDK)
internal enum PipeNameCounter {
private static let value = Atomic<UInt64>(0)

internal static func nextValue() -> UInt64 {
return self.value.add(1, ordering: .relaxed).newValue
}
}
#endif

internal struct CreatedPipe: ~Copyable {
internal enum Purpose: CustomStringConvertible {
/// This pipe is used for standard input. This option maps to
Expand Down Expand Up @@ -817,77 +829,96 @@ internal struct CreatedPipe: ~Copyable {

internal init(closeWhenDone: Bool, purpose: Purpose) throws {
#if canImport(WinSDK)
// On Windows, we need to create a named pipe
let pipeName = "\\\\.\\pipe\\subprocess-\(purpose)-\(Int.random(in: .min ..< .max))"
var saAttributes: SECURITY_ATTRIBUTES = SECURITY_ATTRIBUTES()
saAttributes.nLength = DWORD(MemoryLayout<SECURITY_ATTRIBUTES>.size)
saAttributes.bInheritHandle = true
saAttributes.lpSecurityDescriptor = nil

let parentEnd = pipeName.withCString(
encodedAs: UTF16.self
) { pipeNameW in
// Use OVERLAPPED for async IO
var openMode: DWORD = DWORD(FILE_FLAG_OVERLAPPED)
switch purpose {
case .input:
openMode |= DWORD(PIPE_ACCESS_OUTBOUND)
case .output:
openMode |= DWORD(PIPE_ACCESS_INBOUND)
/// On Windows, we need to create a named pipe.
/// According to Microsoft documentation:
/// > Asynchronous (overlapped) read and write operations are
/// > not supported by anonymous pipes.
/// See https://learn.microsoft.com/en-us/windows/win32/ipc/anonymous-pipe-operations
while true {
/// Windows named pipes are system wide. To avoid creating two pipes with the same
/// name, create the pipe with `FILE_FLAG_FIRST_PIPE_INSTANCE` such that it will
/// return error `ERROR_ACCESS_DENIED` if we try to create another pipe with the same name.
let pipeName = "\\\\.\\pipe\\LOCAL\\subprocess-\(purpose)-\(PipeNameCounter.nextValue())"
var saAttributes: SECURITY_ATTRIBUTES = SECURITY_ATTRIBUTES()
saAttributes.nLength = DWORD(MemoryLayout<SECURITY_ATTRIBUTES>.size)
saAttributes.bInheritHandle = true
saAttributes.lpSecurityDescriptor = nil

let parentEnd = pipeName.withCString(
encodedAs: UTF16.self
) { pipeNameW in
// Use OVERLAPPED for async IO
var openMode: DWORD = DWORD(FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE)
switch purpose {
case .input:
openMode |= DWORD(PIPE_ACCESS_OUTBOUND)
case .output:
openMode |= DWORD(PIPE_ACCESS_INBOUND)
}

return CreateNamedPipeW(
pipeNameW,
openMode,
DWORD(PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT),
1, // Max instance,
DWORD(readBufferSize),
DWORD(readBufferSize),
0,
&saAttributes
)
}
Comment on lines +849 to +869
Copy link

@al45tair al45tair Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, one final thing about this code (sorry). I think we should at least add PIPE_REJECT_REMOTE_CLIENTS to the flags.

Ideally I think we'd specify an ACL for the named pipe, to make it harder to interfere with this code by having some other process call CreateFile between the CreateNamedPipeW and CreateFileW calls here, but the window for mischief is quite small and PIPE_REJECT_REMOTE_CLIENTS would at least restrict it to things already running on the current machine.

guard let parentEnd, parentEnd != INVALID_HANDLE_VALUE else {
// Since we created the pipe with `FILE_FLAG_FIRST_PIPE_INSTANCE`,
// if there's already a pipe with the same name, GetLastError()
// will be set to FILE_FLAG_FIRST_PIPE_INSTANCE. In this case,
// try again with a different name.
let errorCode = GetLastError()
guard errorCode != FILE_FLAG_FIRST_PIPE_INSTANCE else {
continue
}
// Throw all other errors
throw SubprocessError(
code: .init(.asyncIOFailed("CreateNamedPipeW failed")),
underlyingError: .init(rawValue: GetLastError())
)
}

return CreateNamedPipeW(
pipeNameW,
openMode,
DWORD(PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT),
1, // Max instance,
DWORD(readBufferSize),
DWORD(readBufferSize),
0,
&saAttributes
)
}
guard let parentEnd, parentEnd != INVALID_HANDLE_VALUE else {
throw SubprocessError(
code: .init(.asyncIOFailed("CreateNamedPipeW failed")),
underlyingError: .init(rawValue: GetLastError())
)
}
let childEnd = pipeName.withCString(
encodedAs: UTF16.self
) { pipeNameW in
var targetAccess: DWORD = 0
switch purpose {
case .input:
targetAccess = DWORD(GENERIC_READ)
case .output:
targetAccess = DWORD(GENERIC_WRITE)
}

let childEnd = pipeName.withCString(
encodedAs: UTF16.self
) { pipeNameW in
var targetAccess: DWORD = 0
return CreateFileW(
pipeNameW,
targetAccess,
0,
&saAttributes,
DWORD(OPEN_EXISTING),
DWORD(FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED),
nil
)
}
guard let childEnd, childEnd != INVALID_HANDLE_VALUE else {
throw SubprocessError(
code: .init(.asyncIOFailed("CreateFileW failed")),
underlyingError: .init(rawValue: GetLastError())
)
}
switch purpose {
case .input:
targetAccess = DWORD(GENERIC_READ)
self._readFileDescriptor = .init(childEnd, closeWhenDone: closeWhenDone)
self._writeFileDescriptor = .init(parentEnd, closeWhenDone: closeWhenDone)
case .output:
targetAccess = DWORD(GENERIC_WRITE)
self._readFileDescriptor = .init(parentEnd, closeWhenDone: closeWhenDone)
self._writeFileDescriptor = .init(childEnd, closeWhenDone: closeWhenDone)
}

return CreateFileW(
pipeNameW,
targetAccess,
0,
&saAttributes,
DWORD(OPEN_EXISTING),
DWORD(FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED),
nil
)
}
guard let childEnd, childEnd != INVALID_HANDLE_VALUE else {
throw SubprocessError(
code: .init(.asyncIOFailed("CreateFileW failed")),
underlyingError: .init(rawValue: GetLastError())
)
}
switch purpose {
case .input:
self._readFileDescriptor = .init(childEnd, closeWhenDone: closeWhenDone)
self._writeFileDescriptor = .init(parentEnd, closeWhenDone: closeWhenDone)
case .output:
self._readFileDescriptor = .init(parentEnd, closeWhenDone: closeWhenDone)
self._writeFileDescriptor = .init(childEnd, closeWhenDone: closeWhenDone)
return
}
#else
let pipe = try FileDescriptor.pipe()
Expand Down
2 changes: 1 addition & 1 deletion Sources/Subprocess/IO/AsyncIO+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ final class AsyncIO: Sendable {
dispatchIO.read(
offset: 0,
length: maxLength,
queue: .global()
queue: DispatchQueue(label: "SubprocessReadQueue")
) { done, data, error in
if error != 0 {
continuation.resume(
Expand Down