Skip to content

Introduce AsyncIO for Windows and Linux #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 29, 2025
Merged
2 changes: 1 addition & 1 deletion Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ extension AsyncBufferSequence {
)
}
#else
// Cast data to CodeUnitg type
// Cast data to CodeUnit type
let result = buffer.withUnsafeBytes { ptr in
return ptr.withMemoryRebound(to: Encoding.CodeUnit.self) { codeUnitPtr in
return Array(codeUnitPtr)
Expand Down
4 changes: 2 additions & 2 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ internal func _safelyClose(_ target: _CloseTarget) throws {
#if canImport(WinSDK)
case .handle(let handle):
/// Windows does not provide a “deregistration” API (the reverse of
/// `CreateIoCompletionPort`) for handles and it it reuses HANDLE
/// `CreateIoCompletionPort`) for handles and it reuses HANDLE
/// values once they are closed. Since we rely on the handle value
/// as the completion key for `CreateIoCompletionPort`, we should
/// remove the registration when the handle is closed to allow
Expand Down Expand Up @@ -688,7 +688,7 @@ internal struct IODescriptor: ~Copyable {
type: .stream,
fileDescriptor: self.platformDescriptor(),
queue: .global(),
cleanupHandler: { error in
cleanupHandler: { @Sendable error in
// Close the file descriptor
if shouldClose {
try? closeFd.close()
Expand Down
36 changes: 19 additions & 17 deletions Sources/Subprocess/IO/AsyncIO+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ final class AsyncIO: Sendable {
)
return
}
if let data = data {
if let data {
if buffer.isEmpty {
buffer = data
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

threading bug here, you can't capture a mutable buffer. Why does the compiler not error here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? Are you suggesting we have to use Mutex here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, note that this package is still building in Swift 5 language mode (we have not updated the build system's default, even when using a Swift 6 toolchain and tools version), so that can be a cause of missing diagnostics. In this case, it's because of the many missing annotations throughout Dispatch, in particular the read handler is not marked @sendable even though it should be based on its semantics.

I think technically this is safe because this closure is guaranteed to be called on the same serial queue and is not otherwise accessed while those blocks are executing, so there is no actual concurrent access. That said, we should express this in a way that the compiler can guarantee as safe, especially if these DispatchIO annotations get corrected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest you stick @sendable on the handler here, similar to what I mentioned in #117 (comment)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, note that this package is still building in Swift 5 language mode

Hang on, swift-tools-version:6.0 will make it -swift-version 6 (unlike just invoking swiftc on 6.0+). Anyway, we should be in language mode 6.

be called on the same serial queue

it's on .global() which is concurrent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to a dedicated serial queue.

I agree with @jakepetroules that this is safe because all calls are made on the same serial queue. I could mark the closure as @Sendable but that would require us to actually use an unnecessary lock. This is analogues to the old days of actor approach where you just basically use a serial queue to make sure there's no concurrent access.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hang on, swift-tools-version:6.0 will make it -swift-version 6 (unlike just invoking swiftc on 6.0+).

Sorry, you're right, I might've been thinking of an old bug.

I agree with @jakepetroules that this is safe because all calls are made on the same serial queue. I could mark the closure as @sendable but that would require us to actually use an unnecessary lock. This is analogues to the old days of actor approach where you just basically use a serial queue to make sure there's no concurrent access.

At the same time, it's a known issue that the closure is not marked @sendable. Fixing that bug WILL break the build of Swift Subprocess.

So I suggest writing the read method like this, which is compliant with full Sendable checking (without any Mutex) and insulates us against a potential fix there:

internal func read(
    from dispatchIO: DispatchIO,
    upTo maxLength: Int
) async throws -> DispatchData? {
    let stream = AsyncThrowingStream<DispatchData, any Error> { continuation in
        dispatchIO.read(
            offset: 0,
            length: maxLength,
            queue: .global()
        ) { @Sendable done, data, error in
            if error != 0 {
                continuation.finish(
                    throwing: SubprocessError(
                        code: .init(.failedToReadFromSubprocess),
                        underlyingError: .init(rawValue: error)
                    )
                )
                return
            }
            if let data {
                continuation.yield(data)
            }
            if done {
                continuation.finish()
            }
        }
    }
    var buffer: DispatchData = .empty
    for try await chunk in stream {
        buffer.append(chunk)
    }
    return buffer.isEmpty ? nil : buffer
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncStream uses a lock internally. This is a hot code path and it's continuously called when streaming. We really shouldn't put a lock here unless we are sure it is absolutely necessary. IMO, this is the same ~Copyable optional hack we use everywhere. The compiler isn't able to reason that even though we are sending this value to another execution context, it's done sequentially with no overlapping.

If the concern is that when Dispatch adds the @Sendable annotation, Subprocess will break, I'd rather figure out a (@unchecked Sendable) solution then than prematurely optimize it now.

Of course, I could be wrong about this being safe so I'm definitely all ears. I just don't want to change our approach simply because the compiler isn't able to reason our specific use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see your argument for performance and agree @unchecked Sendable might be a good compromise for now (unless @weissi or @FranzBusch knows something we don't w.r.t. memory safety) - would you mind throwing that annotation on the closure with a note about why it's there? That way we're insulated against future Dispatch changes and it's also explicit that we're knowingly bypassing compiler checking.

} else {
Expand All @@ -81,8 +81,8 @@ final class AsyncIO: Sendable {
to diskIO: borrowing IOChannel
) async throws -> Int {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Int, any Error>) in
let dispatchData = span.withUnsafeBytes {
return DispatchData(
span.withUnsafeBytes {
let dispatchData = DispatchData(
bytesNoCopy: $0,
deallocator: .custom(
nil,
Expand All @@ -91,12 +91,13 @@ final class AsyncIO: Sendable {
}
)
)
}
self.write(dispatchData, to: diskIO) { writtenLength, error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: writtenLength)

self.write(dispatchData, to: diskIO) { writtenLength, error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: writtenLength)
}
}
}
}
Expand All @@ -108,8 +109,8 @@ final class AsyncIO: Sendable {
to diskIO: borrowing IOChannel
) async throws -> Int {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Int, any Error>) in
let dispatchData = array.withUnsafeBytes {
return DispatchData(
array.withUnsafeBytes {
let dispatchData = DispatchData(
bytesNoCopy: $0,
deallocator: .custom(
nil,
Expand All @@ -118,12 +119,13 @@ final class AsyncIO: Sendable {
}
)
)
}
self.write(dispatchData, to: diskIO) { writtenLength, error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: writtenLength)

self.write(dispatchData, to: diskIO) { writtenLength, error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: writtenLength)
}
}
}
}
Expand Down
31 changes: 15 additions & 16 deletions Sources/Subprocess/IO/AsyncIO+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,7 @@ final class AsyncIO: Sendable {
shutdownFileDescriptor: shutdownFileDescriptor
)
let threadContext = Unmanaged.passRetained(context)
#if os(FreeBSD) || os(OpenBSD)
var thread: pthread_t? = nil
#else
var thread: pthread_t = pthread_t()
#endif
rc = pthread_create(&thread, nil, { args in
func reportError(_ error: SubprocessError) {
_registration.withLock { store in
Expand Down Expand Up @@ -175,11 +171,13 @@ final class AsyncIO: Sendable {
}

// Notify the continuation
_registration.withLock { store in
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
if let continuation = store[targetFileDescriptor] {
continuation.yield(true)
return continuation
}
return nil
}
continuation?.yield(true)
}
}

Expand All @@ -194,16 +192,10 @@ final class AsyncIO: Sendable {
return
}

#if os(FreeBSD) || os(OpenBSD)
let monitorThread = thread!
#else
let monitorThread = thread
#endif

let state = State(
epollFileDescriptor: epollFileDescriptor,
shutdownFileDescriptor: shutdownFileDescriptor,
monitorThread: monitorThread
monitorThread: thread
)
self.state = .success(state)

Expand All @@ -222,6 +214,8 @@ final class AsyncIO: Sendable {
_ = _SubprocessCShims.write(currentState.shutdownFileDescriptor, &one, MemoryLayout<UInt64>.stride)
// Cleanup the monitor thread
pthread_join(currentState.monitorThread, nil)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will go badly wrong if we do this more than once. shutdown should protect against that. After pthread_join the pthread_t is a dangling pointer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown is only called in atexit, which by definition should only run once.

close(currentState.epollFileDescriptor)
close(currentState.shutdownFileDescriptor)
}


Expand Down Expand Up @@ -394,7 +388,7 @@ extension AsyncIO {
resultBuffer.removeLast(resultBuffer.count - readLength)
return resultBuffer
} else {
if errno == EAGAIN || errno == EWOULDBLOCK {
if self.shouldWaitForNextSignal(with: errno) {
// No more data for now wait for the next signal
break
} else {
Expand Down Expand Up @@ -443,7 +437,7 @@ extension AsyncIO {
return writtenLength
}
} else {
if errno == EAGAIN || errno == EWOULDBLOCK {
if self.shouldWaitForNextSignal(with: errno) {
// No more data for now wait for the next signal
break
} else {
Expand Down Expand Up @@ -486,7 +480,7 @@ extension AsyncIO {
return writtenLength
}
} else {
if errno == EAGAIN || errno == EWOULDBLOCK {
if self.shouldWaitForNextSignal(with: errno) {
// No more data for now wait for the next signal
break
} else {
Expand All @@ -500,6 +494,11 @@ extension AsyncIO {
return 0
}
#endif

@inline(__always)
private func shouldWaitForNextSignal(with error: CInt) -> Bool {
return error == EAGAIN || error == EWOULDBLOCK || error == EINTR
}
}

extension Array : AsyncIO._ContiguousBytes where Element == UInt8 {}
Expand Down
97 changes: 57 additions & 40 deletions Sources/Subprocess/IO/AsyncIO+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
@preconcurrency import SystemPackage
#endif

import _SubprocessCShims
import Synchronization
internal import Dispatch
@preconcurrency import WinSDK
Expand Down Expand Up @@ -71,7 +72,11 @@ final class AsyncIO: @unchecked Sendable {
// Create monitor thread
let threadContext = MonitorThreadContext(ioCompletionPort: port)
let threadContextPtr = Unmanaged.passRetained(threadContext)
let threadHandle = CreateThread(nil, 0, { args in
/// Microsoft documentation for `CreateThread` states:
/// > A thread in an executable that calls the C run-time library (CRT)
/// > should use the _beginthreadex and _endthreadex functions for
/// > thread management rather than CreateThread and ExitThread
let threadHandleValue = _beginthreadex(nil, 0, { args in
func reportError(_ error: SubprocessError) {
_registration.withLock { store in
for continuation in store.values {
Expand Down Expand Up @@ -126,18 +131,23 @@ final class AsyncIO: @unchecked Sendable {
break
}
// Notify the continuations
_registration.withLock { store in
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
if let continuation = store[targetFileDescriptor] {
continuation.yield(bytesTransferred)
return continuation
}
return nil
}
continuation?.yield(bytesTransferred)
}
return 0
}, threadContextPtr.toOpaque(), 0, nil)
guard let threadHandle = threadHandle else {
guard threadHandleValue > 0,
let threadHandle = HANDLE(bitPattern: threadHandleValue) else {
// _beginthreadex uses errno instead of GetLastError()
let capturedError = _subprocess_windows_get_errno()
let error = SubprocessError(
code: .init(.asyncIOFailed("CreateThread failed")),
underlyingError: .init(rawValue: GetLastError())
code: .init(.asyncIOFailed("_beginthreadex failed")),
underlyingError: .init(rawValue: capturedError)
)
self.monitorThread = .failure(error)
return
Expand All @@ -156,10 +166,10 @@ final class AsyncIO: @unchecked Sendable {
return
}
PostQueuedCompletionStatus(
ioPort,
0,
shutdownPort,
nil
ioPort, // CompletionPort
0, // Number of bytes transferred.
shutdownPort, // Completion key to post status
nil // Overlapped
)
// Wait for monitor thread to exit
WaitForSingleObject(monitorThreadHandle, INFINITE)
Expand Down Expand Up @@ -246,26 +256,24 @@ final class AsyncIO: @unchecked Sendable {
var signalStream = self.registerHandle(handle).makeAsyncIterator()

while true {
// We use an empty `_OVERLAPPED()` here because `ReadFile` below
// only reads non-seekable files, aka pipes.
var overlapped = _OVERLAPPED()
let succeed = try resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in
// Get a pointer to the memory at the specified offset
// Windows ReadFile uses DWORD for target count, which means we can only
// read up to DWORD (aka UInt32) max.
let targetCount: DWORD
if MemoryLayout<Int>.size == MemoryLayout<Int32>.size {
// On 32 bit systems we don't have to worry about overflowing
targetCount = DWORD(truncatingIfNeeded: bufferPointer.count - readLength)
} else {
// On 64 bit systems we need to cap the count at DWORD max
targetCount = DWORD(truncatingIfNeeded: min(bufferPointer.count - readLength, Int(UInt32.max)))
}
let targetCount: DWORD = self.calculateRemainingCount(
totalCount: bufferPointer.count,
readCount: readLength
)

let offsetAddress = bufferPointer.baseAddress!.advanced(by: readLength)
// Read directly into the buffer at the offset
return ReadFile(
handle,
offsetAddress,
DWORD(truncatingIfNeeded: targetCount),
targetCount,
nil,
&overlapped
)
Expand Down Expand Up @@ -300,7 +308,7 @@ final class AsyncIO: @unchecked Sendable {
return resultBuffer
} else {
// Read some data
readLength += Int(bytesRead)
readLength += Int(truncatingIfNeeded: bytesRead)
if maxLength == .max {
// Grow resultBuffer if needed
guard Double(readLength) > 0.8 * Double(resultBuffer.count) else {
Expand Down Expand Up @@ -333,24 +341,22 @@ final class AsyncIO: @unchecked Sendable {
var signalStream = self.registerHandle(diskIO.channel).makeAsyncIterator()
var writtenLength: Int = 0
while true {
// We use an empty `_OVERLAPPED()` here because `WriteFile` below
// only writes to non-seekable files, aka pipes.
var overlapped = _OVERLAPPED()
let succeed = try span.withUnsafeBytes { ptr in
// Windows WriteFile uses DWORD for target count
// which means we can only write up to DWORD max
let remainingLength: DWORD
if MemoryLayout<Int>.size == MemoryLayout<Int32>.size {
// On 32 bit systems we don't have to worry about overflowing
remainingLength = DWORD(truncatingIfNeeded: ptr.count - writtenLength)
} else {
// On 64 bit systems we need to cap the count at DWORD max
remainingLength = DWORD(truncatingIfNeeded: min(ptr.count - writtenLength, Int(DWORD.max)))
}
let remainingLength: DWORD = self.calculateRemainingCount(
totalCount: ptr.count,
readCount: writtenLength
)

let startPtr = ptr.baseAddress!.advanced(by: writtenLength)
return WriteFile(
handle,
startPtr,
DWORD(truncatingIfNeeded: remainingLength),
remainingLength,
nil,
&overlapped
)
Expand All @@ -371,7 +377,7 @@ final class AsyncIO: @unchecked Sendable {
// Now wait for read to finish
let bytesWritten: DWORD = try await signalStream.next() ?? 0

writtenLength += Int(bytesWritten)
writtenLength += Int(truncatingIfNeeded: bytesWritten)
if writtenLength >= span.byteCount {
return writtenLength
}
Expand All @@ -387,23 +393,21 @@ final class AsyncIO: @unchecked Sendable {
var signalStream = self.registerHandle(diskIO.channel).makeAsyncIterator()
var writtenLength: Int = 0
while true {
// We use an empty `_OVERLAPPED()` here because `WriteFile` below
// only writes to non-seekable files, aka pipes.
var overlapped = _OVERLAPPED()
let succeed = try bytes.withUnsafeBytes { ptr in
// Windows WriteFile uses DWORD for target count
// which means we can only write up to DWORD max
let remainingLength: DWORD
if MemoryLayout<Int>.size == MemoryLayout<Int32>.size {
// On 32 bit systems we don't have to worry about overflowing
remainingLength = DWORD(truncatingIfNeeded: ptr.count - writtenLength)
} else {
// On 64 bit systems we need to cap the count at DWORD max
remainingLength = DWORD(truncatingIfNeeded: min(ptr.count - writtenLength, Int(DWORD.max)))
}
let remainingLength: DWORD = self.calculateRemainingCount(
totalCount: ptr.count,
readCount: writtenLength
)
let startPtr = ptr.baseAddress!.advanced(by: writtenLength)
return WriteFile(
handle,
startPtr,
DWORD(truncatingIfNeeded: remainingLength),
remainingLength,
nil,
&overlapped
)
Expand All @@ -423,12 +427,25 @@ final class AsyncIO: @unchecked Sendable {
}
// Now wait for read to finish
let bytesWritten: DWORD = try await signalStream.next() ?? 0
writtenLength += Int(bytesWritten)
writtenLength += Int(truncatingIfNeeded: bytesWritten)
if writtenLength >= bytes.count {
return writtenLength
}
}
}

// Windows ReadFile uses DWORD for target count, which means we can only
// read up to DWORD (aka UInt32) max.
private func calculateRemainingCount(totalCount: Int, readCount: Int) -> DWORD {
// We support both 32bit and 64bit systems for Windows
if MemoryLayout<Int>.size == MemoryLayout<Int32>.size {
// On 32 bit systems we don't have to worry about overflowing
return DWORD(truncatingIfNeeded: totalCount - readCount)
} else {
// On 64 bit systems we need to cap the count at DWORD max
return DWORD(truncatingIfNeeded: min(totalCount - readCount, Int(DWORD.max)))
}
}
}

extension Array : AsyncIO._ContiguousBytes where Element == UInt8 {}
Expand Down
Loading