Skip to content

Commit 85613ea

Browse files
committed
[WIP] Reading I/O from processes is uninterruptible, leading to hangs
In some Linux environments and on FreeBSD, I've observed frequent hangs due to Swift Concurrency task cancellation terminating a process via the teardown sequence, while reading the output hangs forever. I expect the present issues on GitHub Actions Linux CI are due to this as well, though strangely I haven't been able to reproduce it on my local aarch64 Linux VM. I'm not sure why this doesn't seem to occur on macOS and Windows, though a likely explanation is that Subprocess's Windows implementation doesn't use DispatchIO, and Dispatch is very buggy on non-Apple platforms. I/O reading needs to be cancellable so that the library doesn't hang. This is quite readily reproducible with testCancelProcessVeryEarlyOnStressTest on FreeBSD; sleep will get zombied from the cancellation but the library will sit waiting for output forever if it managed to get into DispatchIO.read before the cancellation took effect. To solve this, (ab)use an AsyncStream to force early cancellation when the parent task is cancelled. Closes #108
1 parent 0e853a9 commit 85613ea

File tree

2 files changed

+72
-20
lines changed

2 files changed

+72
-20
lines changed

Sources/Subprocess/Configuration.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ internal enum StringOrRawBytes: Sendable, Hashable {
585585
/// automatically when done.
586586
internal struct TrackedFileDescriptor: ~Copyable {
587587
internal var closeWhenDone: Bool
588-
internal let fileDescriptor: FileDescriptor
588+
internal var fileDescriptor: FileDescriptor
589589

590590
internal init(
591591
_ fileDescriptor: FileDescriptor,
@@ -675,7 +675,7 @@ internal struct TrackedDispatchIO: ~Copyable {
675675
return
676676
}
677677
closeWhenDone = false
678-
dispatchIO.close()
678+
dispatchIO.close(flags: [.stop])
679679
}
680680

681681
deinit {

Sources/Subprocess/IO/Output.swift

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,9 @@ public struct BytesOutput: OutputProtocol {
142142
internal func captureOutput(
143143
from diskIO: consuming TrackedPlatformDiskIO?
144144
) async throws -> [UInt8] {
145-
#if os(Windows)
146-
let result = try await diskIO?.fileDescriptor.read(upToLength: self.maxSize) ?? []
147-
try diskIO?.safelyClose()
148-
return result
149-
#else
150-
let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize)
151-
try diskIO?.safelyClose()
152-
return result?.array() ?? []
153-
#endif
145+
try await diskIO!.readCancellable { diskIO in
146+
try await diskIO.read(upToLength: self.maxSize)?.array() ?? []
147+
}
154148
}
155149

156150
#if SubprocessSpan
@@ -264,15 +258,11 @@ extension OutputProtocol {
264258
if OutputType.self == Void.self {
265259
return () as! OutputType
266260
}
267-
#if os(Windows)
268-
let result = try await diskIO?.fileDescriptor.read(upToLength: self.maxSize)
269-
try diskIO?.safelyClose()
270-
return try self.output(from: result ?? [])
271-
#else
272-
let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize)
273-
try diskIO?.safelyClose()
274-
return try self.output(from: result ?? .empty)
275-
#endif
261+
262+
return try await diskIO!.readCancellable { diskIO in
263+
let result = try await diskIO.read(upToLength: self.maxSize)
264+
return try self.output(from: result ?? .empty)
265+
}
276266
}
277267
}
278268

@@ -338,3 +328,65 @@ extension DispatchData {
338328
return result ?? []
339329
}
340330
}
331+
332+
#if os(Windows)
333+
typealias PlatformIO = FileDescriptor
334+
extension Array {
335+
fileprivate var empty: Self {
336+
[]
337+
}
338+
339+
fileprivate func array() -> Self {
340+
self
341+
}
342+
}
343+
#else
344+
typealias PlatformIO = DispatchIO
345+
#endif
346+
347+
/// Runs `block` while _immediately_ responding to cancellation by throwing a `CancellationError` if the parent task is cancelled, regardless of whether `block` reacts to cancellation.
348+
fileprivate func withImmediateCancellation<T>(_ block: @escaping @Sendable () async throws -> T) async throws -> T {
349+
// (ab)use an AsyncStream to return the buffer or immediately react to cancellation
350+
let stream = AsyncThrowingStream {
351+
return try await block()
352+
}
353+
var it = stream.makeAsyncIterator()
354+
guard let next = try await it.next() else {
355+
throw CancellationError()
356+
}
357+
return next
358+
}
359+
360+
extension TrackedPlatformDiskIO {
361+
mutating func readCancellable<OutputType: Sendable>(_ block: @escaping @Sendable (PlatformIO) async throws -> OutputType) async throws -> OutputType {
362+
try await tryFinally {
363+
let io: PlatformIO
364+
#if os(Windows)
365+
io = fileDescriptor
366+
#else
367+
io = dispatchIO
368+
#endif
369+
return try await withImmediateCancellation {
370+
try await block(io)
371+
}
372+
} finally: { _ in
373+
try safelyClose()
374+
}
375+
}
376+
}
377+
378+
fileprivate func tryFinally<T: Sendable>(_ work: () async throws -> T, finally: (Error?) async throws -> ()) async throws -> T {
379+
let result: Result<T, Error>
380+
do {
381+
result = try await .success(work())
382+
} catch let e {
383+
result = .failure(e)
384+
}
385+
switch result {
386+
case .success:
387+
try await finally(nil)
388+
case let .failure(error):
389+
try await finally(error)
390+
}
391+
return try result.get()
392+
}

0 commit comments

Comments
 (0)