diff --git a/Package.swift b/Package.swift index 479975e..78f88e9 100644 --- a/Package.swift +++ b/Package.swift @@ -76,9 +76,22 @@ let package = Package( ] ), .systemLibrary(name: "SystemSQLite", pkgConfig: "sqlite3"), + + // `AsyncProcess` modules and dependencies + + .target(name: "CProcessSpawnSync"), + .target( + name: "ProcessSpawnSync", + dependencies: [ + "CProcessSpawnSync", + .product(name: "Atomics", package: "swift-atomics"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + ] + ), .target( name: "AsyncProcess", dependencies: [ + "ProcessSpawnSync", .product(name: "Atomics", package: "swift-atomics"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), .product(name: "Logging", package: "swift-log"), diff --git a/Sources/AsyncProcess/ChunkSequence.swift b/Sources/AsyncProcess/ChunkSequence.swift index fe60529..f36d0d1 100644 --- a/Sources/AsyncProcess/ChunkSequence.swift +++ b/Sources/AsyncProcess/ChunkSequence.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -33,17 +33,17 @@ public struct ChunkSequence: AsyncSequence & Sendable { public func makeAsyncIterator() -> AsyncIterator { // This will close the file handle. - AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: self.group.any())) + return AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: group.any())) } public typealias Element = ByteBuffer public struct AsyncIterator: AsyncIteratorProtocol { public typealias Element = ByteBuffer - typealias UnderlyingSequence = FileContentStream + internal typealias UnderlyingSequence = FileContentStream private var underlyingIterator: UnderlyingSequence.AsyncIterator? - init(_ underlyingSequence: UnderlyingSequence?) { + internal init(_ underlyingSequence: UnderlyingSequence?) { self.underlyingIterator = underlyingSequence?.makeAsyncIterator() } diff --git a/Sources/AsyncProcess/EOFSequence.swift b/Sources/AsyncProcess/EOFSequence.swift index cc170fe..341dcc4 100644 --- a/Sources/AsyncProcess/EOFSequence.swift +++ b/Sources/AsyncProcess/EOFSequence.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -15,13 +15,13 @@ public struct EOFSequence: AsyncSequence & Sendable { public struct AsyncIterator: AsyncIteratorProtocol { public mutating func next() async throws -> Element? { - nil + return nil } } public init(of type: Element.Type = Element.self) {} public func makeAsyncIterator() -> AsyncIterator { - AsyncIterator() + return AsyncIterator() } } diff --git a/Sources/AsyncProcess/FileContentStream.swift b/Sources/AsyncProcess/FileContentStream.swift index 4999b8f..50e1cf4 100644 --- a/Sources/AsyncProcess/FileContentStream.swift +++ b/Sources/AsyncProcess/FileContentStream.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -19,12 +19,13 @@ import NIO // - Known issues: // - no tests // - most configurations have never run -struct FileContentStream: AsyncSequence { +internal typealias FileContentStream = _FileContentStream +public struct _FileContentStream: AsyncSequence & Sendable { public typealias Element = ByteBuffer typealias Underlying = AsyncThrowingChannel public func makeAsyncIterator() -> AsyncIterator { - AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator()) + return AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator()) } public struct AsyncIterator: AsyncIteratorProtocol { @@ -33,7 +34,7 @@ struct FileContentStream: AsyncSequence { var underlying: Underlying.AsyncIterator public mutating func next() async throws -> ByteBuffer? { - try await self.underlying.next() + return try await self.underlying.next() } } @@ -41,13 +42,23 @@ struct FileContentStream: AsyncSequence { public var errnoValue: CInt public static func makeFromErrnoGlobal() -> IOError { - IOError(errnoValue: errno) + return IOError(errnoValue: errno) } } private let asyncChannel: AsyncThrowingChannel - public init( + public static func makeReader( + fileDescriptor: CInt, + eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(), + blockingPool: NIOThreadPool = .singleton + ) async throws -> _FileContentStream { + return try await eventLoop.submit { + try FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool) + }.get() + } + + internal init( fileDescriptor: CInt, eventLoop: EventLoop, blockingPool: NIOThreadPool? = nil @@ -64,7 +75,7 @@ struct FileContentStream: AsyncSequence { switch statInfo.st_mode & S_IFMT { case S_IFREG: - guard let blockingPool else { + guard let blockingPool = blockingPool else { throw IOError(errnoValue: EINVAL) } let fileHandle = NIOLoopBound( @@ -86,7 +97,7 @@ struct FileContentStream: AsyncSequence { .whenComplete { result in try! fileHandle.value.close() switch result { - case let .failure(error): + case .failure(let error): asyncChannel.fail(error) case .success: asyncChannel.finish() @@ -140,7 +151,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler { return data case .error: return nil - case var .sending(queue): + case .sending(var queue): queue.append(data) self = .sending(queue) return nil @@ -153,7 +164,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler { preconditionFailure("didSendOne during .idle") case .error: return nil - case var .sending(queue): + case .sending(var queue): if queue.isEmpty { self = .idle return nil @@ -212,7 +223,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler { eventLoop.makeFutureWithTask { // note: We're _not_ on an EventLoop thread here switch data { - case let .chunk(data): + case .chunk(let data): await sink.send(data) case .finish: sink.finish() @@ -255,10 +266,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler { extension FileHandle { func fileContentStream(eventLoop: EventLoop) throws -> FileContentStream { - let asyncBytes = try FileContentStream( - fileDescriptor: self.fileDescriptor, - eventLoop: eventLoop - ) + let asyncBytes = try FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop) try self.close() return asyncBytes } @@ -266,7 +274,7 @@ extension FileHandle { extension FileContentStream { var lines: AsyncByteBufferLineSequence { - AsyncByteBufferLineSequence( + return AsyncByteBufferLineSequence( self, dropTerminator: true, maximumAllowableBufferSize: 1024 * 1024, @@ -281,7 +289,7 @@ extension AsyncSequence where Element == ByteBuffer, Self: Sendable { maximumAllowableBufferSize: Int = 1024 * 1024, dropLastChunkIfNoNewline: Bool = false ) -> AsyncByteBufferLineSequence { - AsyncByteBufferLineSequence( + return AsyncByteBufferLineSequence( self, dropTerminator: dropTerminator, maximumAllowableBufferSize: maximumAllowableBufferSize, @@ -290,7 +298,7 @@ extension AsyncSequence where Element == ByteBuffer, Self: Sendable { } public var strings: AsyncMapSequence { - self.map { String(buffer: $0) } + return self.map { String(buffer: $0) } } } @@ -312,7 +320,7 @@ where Base: AsyncSequence, Base.Element == ByteBuffer { struct Buffer { private var buffer: [ByteBuffer] = [] - private(set) var byteCount: Int = 0 + internal private(set) var byteCount: Int = 0 mutating func append(_ buffer: ByteBuffer) { self.buffer.append(buffer) @@ -320,20 +328,18 @@ where Base: AsyncSequence, Base.Element == ByteBuffer { } func allButLast() -> ArraySlice { - self.buffer.dropLast() + return self.buffer.dropLast() } var byteCountButLast: Int { - self.byteCount - (self.buffer.last?.readableBytes ?? 0) + return self.byteCount - (self.buffer.last?.readableBytes ?? 0) } var lastChunkView: ByteBufferView? { - self.buffer.last?.readableBytesView + return self.buffer.last?.readableBytesView } - mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) - -> ByteBuffer - { + mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) -> ByteBuffer { var output = ByteBuffer() output.reserveCapacity(lastLength + self.byteCountButLast) @@ -359,7 +365,7 @@ where Base: AsyncSequence, Base.Element == ByteBuffer { } } - init( + internal init( underlying: Base.AsyncIterator, dropTerminator: Bool, maximumAllowableBufferSize: Int, @@ -446,7 +452,7 @@ where Base: AsyncSequence, Base.Element == ByteBuffer { } public func makeAsyncIterator() -> AsyncIterator { - AsyncIterator( + return AsyncIterator( underlying: self.underlying.makeAsyncIterator(), dropTerminator: self.dropTerminator, maximumAllowableBufferSize: self.maximumAllowableBufferSize, diff --git a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift index cb36f74..cbf4120 100644 --- a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift +++ b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information diff --git a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift index 59e0302..f91e6f9 100644 --- a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift +++ b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -34,7 +34,7 @@ public struct OutputLoggingSettings: Sendable { self.to = to } - func logMessage(line: String) -> Logger.Message { + internal func logMessage(line: String) -> Logger.Message { switch self.to { case .logMessage: return "\(line)" @@ -43,7 +43,7 @@ public struct OutputLoggingSettings: Sendable { } } - func metadata(stream: ProcessOutputStream, line: String) -> Logger.Metadata { + internal func metadata(stream: ProcessOutputStream, line: String) -> Logger.Metadata { switch self.to { case .logMessage: return ["stream": "\(stream.description)"] @@ -63,18 +63,18 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` - /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you - /// don't want to + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you don't want to /// provide input. /// - logger: Where to log diagnostic messages to (default to no where) public static func run( group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], + spawnOptions: SpawnOptions = .default, standardInput: StandardInput, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { let p = Self( @@ -82,9 +82,11 @@ extension ProcessExecutor { executable: executable, arguments, environment: environment, + spawnOptions: spawnOptions, standardInput: standardInput, standardOutput: .discard, standardError: .discard, + teardownSequence: teardownSequence, logger: logger ) return try await p.run() @@ -99,10 +101,8 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` - /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you - /// don't want to + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you don't want to /// provide input. /// - logger: Where to log diagnostic and output messages to /// - logConfiguration: How to log the output lines @@ -112,6 +112,8 @@ extension ProcessExecutor { _ arguments: [String], standardInput: StandardInput, environment: [String: String] = [:], + spawnOptions: SpawnOptions = .default, + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger, logConfiguration: OutputLoggingSettings ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { @@ -120,17 +122,17 @@ extension ProcessExecutor { executable: executable, arguments, environment: environment, + spawnOptions: spawnOptions, standardInput: standardInput, standardOutput: .stream, standardError: .stream, + teardownSequence: teardownSequence, logger: logger ) return try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in group.addTask { for try await (stream, line) in await merge( - exe.standardOutput.splitIntoLines().strings.map { - (ProcessOutputStream.standardOutput, $0) - }, + exe.standardOutput.splitIntoLines().strings.map { (ProcessOutputStream.standardOutput, $0) }, exe.standardError.splitIntoLines().strings.map { (ProcessOutputStream.standardError, $0) } ) { logger.log( @@ -143,7 +145,7 @@ extension ProcessExecutor { } group.addTask { - try await exe.run() + return try await exe.run() } while let next = try await group.next() { @@ -164,23 +166,22 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` - /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you - /// don't want to + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you don't want to /// provide input. /// - outputProcessor: The closure that'll be called for every chunk of output - /// - splitOutputIntoLines: Whether to call the closure with full lines (`true`) or arbitrary chunks of output - /// (`false`) + /// - splitOutputIntoLines: Whether to call the closure with full lines (`true`) or arbitrary chunks of output (`false`) /// - logger: Where to log diagnostic and output messages to public static func runProcessingOutput( group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], + spawnOptions: SpawnOptions = .default, standardInput: StandardInput, outputProcessor: @escaping @Sendable (ProcessOutputStream, ByteBuffer) async throws -> Void, splitOutputIntoLines: Bool = false, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { let exe = ProcessExecutor( @@ -188,9 +189,11 @@ extension ProcessExecutor { executable: executable, arguments, environment: environment, + spawnOptions: spawnOptions, standardInput: standardInput, standardOutput: .stream, standardError: .stream, + teardownSequence: teardownSequence, logger: logger ) return try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in @@ -215,7 +218,7 @@ extension ProcessExecutor { } group.addTask { - try await exe.run() + return try await exe.run() } while let next = try await group.next() { @@ -252,25 +255,23 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` - /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you - /// don't want to + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you don't want to /// provide input. - /// - collectStandardOutput: If `true`, collect all of the child process' standard output into memory, discard if - /// `false` - /// - collectStandardError: If `true`, collect all of the child process' standard error into memory, discard if - /// `false` + /// - collectStandardOutput: If `true`, collect all of the child process' standard output into memory, discard if `false` + /// - collectStandardError: If `true`, collect all of the child process' standard error into memory, discard if `false` /// - logger: Where to log diagnostic and output messages to public static func runCollectingOutput( group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], + spawnOptions: SpawnOptions = .default, standardInput: StandardInput, collectStandardOutput: Bool, collectStandardError: Bool, perStreamCollectionLimitBytes: Int = 128 * 1024, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReasonAndOutput where StandardInput.Element == ByteBuffer { let exe = ProcessExecutor( @@ -278,9 +279,11 @@ extension ProcessExecutor { executable: executable, arguments, environment: environment, + spawnOptions: spawnOptions, standardInput: standardInput, standardOutput: collectStandardOutput ? .stream : .discard, standardError: collectStandardError ? .stream : .discard, + teardownSequence: teardownSequence, logger: logger ) @@ -289,9 +292,7 @@ extension ProcessExecutor { if collectStandardOutput { var output: ByteBuffer? = nil for try await chunk in await exe.standardOutput { - guard - (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes - else { + guard (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes else { throw TooMuchProcessOutputError(stream: .standardOutput) } output.setOrWriteImmutableBuffer(chunk) @@ -306,9 +307,7 @@ extension ProcessExecutor { if collectStandardError { var output: ByteBuffer? = nil for try await chunk in await exe.standardError { - guard - (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes - else { + guard (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes else { throw TooMuchProcessOutputError(stream: .standardError) } output.setOrWriteImmutableBuffer(chunk) @@ -320,21 +319,17 @@ extension ProcessExecutor { } group.addTask { - try await .exitReason(exe.run()) + return .exitReason(try await exe.run()) } - var allInfo = ProcessExitReasonAndOutput( - exitReason: .exit(-1), - standardOutput: nil, - standardError: nil - ) + var allInfo = ProcessExitReasonAndOutput(exitReason: .exit(-1), standardOutput: nil, standardError: nil) while let next = try await group.next() { switch next { - case let .exitReason(exitReason): + case .exitReason(let exitReason): allInfo.exitReason = exitReason - case let .standardOutput(output): + case .standardOutput(let output): allInfo.standardOutput = output - case let .standardError(output): + case .standardError(let output): allInfo.standardError = output } } @@ -353,22 +348,25 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` /// - logger: Where to log diagnostic messages to (default to no where) public static func run( group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], environment: [String: String] = [:], + spawnOptions: SpawnOptions = .default, + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason { - try await self.run( + return try await Self.run( group: group, executable: executable, arguments, + spawnOptions: spawnOptions, standardInput: EOFSequence(), environment: environment, + teardownSequence: teardownSequence, logger: logger ) } @@ -382,8 +380,7 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` /// - logger: Where to log diagnostic and output messages to /// - logConfiguration: How to log the output lines public static func runLogOutput( @@ -391,15 +388,19 @@ extension ProcessExecutor { executable: String, _ arguments: [String], environment: [String: String] = [:], + spawnOptions: SpawnOptions = .default, + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger, logConfiguration: OutputLoggingSettings ) async throws -> ProcessExitReason { - try await self.runLogOutput( + return try await Self.runLogOutput( group: group, executable: executable, arguments, standardInput: EOFSequence(), environment: environment, + spawnOptions: spawnOptions, + teardownSequence: teardownSequence, logger: logger, logConfiguration: logConfiguration ) @@ -414,29 +415,31 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` /// - outputProcessor: The closure that'll be called for every chunk of output - /// - splitOutputIntoLines: Whether to call the closure with full lines (`true`) or arbitrary chunks of output - /// (`false`) + /// - splitOutputIntoLines: Whether to call the closure with full lines (`true`) or arbitrary chunks of output (`false`) /// - logger: Where to log diagnostic and output messages to public static func runProcessingOutput( group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], + spawnOptions: SpawnOptions = .default, outputProcessor: @escaping @Sendable (ProcessOutputStream, ByteBuffer) async throws -> Void, splitOutputIntoLines: Bool = false, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason { - try await self.runProcessingOutput( + return try await Self.runProcessingOutput( group: group, executable: executable, arguments, + spawnOptions: spawnOptions, standardInput: EOFSequence(), outputProcessor: outputProcessor, splitOutputIntoLines: splitOutputIntoLines, environment: environment, + teardownSequence: teardownSequence, logger: logger ) } @@ -450,32 +453,33 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` - /// - collectStandardOutput: If `true`, collect all of the child process' standard output into memory, discard if - /// `false` - /// - collectStandardError: If `true`, collect all of the child process' standard error into memory, discard if - /// `false` + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` + /// - collectStandardOutput: If `true`, collect all of the child process' standard output into memory, discard if `false` + /// - collectStandardError: If `true`, collect all of the child process' standard error into memory, discard if `false` /// - logger: Where to log diagnostic and output messages to public static func runCollectingOutput( group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], + spawnOptions: SpawnOptions = .default, collectStandardOutput: Bool, collectStandardError: Bool, perStreamCollectionLimitBytes: Int = 128 * 1024, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReasonAndOutput { - try await self.runCollectingOutput( + return try await Self.runCollectingOutput( group: group, executable: executable, arguments, + spawnOptions: spawnOptions, standardInput: EOFSequence(), collectStandardOutput: collectStandardOutput, collectStandardError: collectStandardError, perStreamCollectionLimitBytes: perStreamCollectionLimitBytes, environment: environment, + teardownSequence: teardownSequence, logger: logger ) } diff --git a/Sources/AsyncProcess/ProcessExecutor.swift b/Sources/AsyncProcess/ProcessExecutor.swift index 4fe211d..45bb831 100644 --- a/Sources/AsyncProcess/ProcessExecutor.swift +++ b/Sources/AsyncProcess/ProcessExecutor.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -10,20 +10,40 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Atomics -import Foundation import Logging import NIO +import ProcessSpawnSync @_exported import struct SystemPackage.FileDescriptor +#if os(Linux) || ASYNC_PROCESS_FORCE_PS_PROCESS + // Foundation.Process is too buggy on Linux + // + // - Foundation.Process on Linux throws error Error Domain=NSCocoaErrorDomain Code=256 "(null)" if executable not found + // https://github.com/swiftlang/swift-corelibs-foundation/issues/4810 + // - Foundation.Process on Linux doesn't correctly detect when child process dies (creating zombie processes) + // https://github.com/swiftlang/swift-corelibs-foundation/issues/4795 + // - Foundation.Process on Linux seems to inherit the Process.run()-calling thread's signal mask, even SIGTERM blocked + // https://github.com/swiftlang/swift-corelibs-foundation/issues/4772 + typealias Process = PSProcess +#endif + +#if os(iOS) || os(tvOS) || os(watchOS) + // Note: Process() in iOS/tvOS/watchOS is available in internal builds only under Foundation Private/headers + import Foundation_Private.NSTask +#else + import Foundation +#endif + public struct ProcessOutputStream: Sendable & Hashable & CustomStringConvertible { - enum Backing { + internal enum Backing { case standardOutput case standardError } - var backing: Backing + internal var backing: Backing public static let standardOutput: Self = .init(backing: .standardOutput) @@ -41,15 +61,14 @@ public struct ProcessOutputStream: Sendable & Hashable & CustomStringConvertible /// What to do with a given stream (`stdout`/`stderr`) in the spawned child process. public struct ProcessOutput: Sendable { - enum Backing { + internal enum Backing { case discard case inherit case fileDescriptorOwned(FileDescriptor) case fileDescriptorShared(FileDescriptor) case stream } - - var backing: Backing + internal var backing: Backing /// Discard the child process' output. /// @@ -66,7 +85,7 @@ public struct ProcessOutput: Sendable { /// /// - warning: After passing a `FileDescriptor` to this method you _must not_ perform _any_ other operations on it. public static func fileDescriptor(takingOwnershipOf fd: FileDescriptor) -> Self { - .init(backing: .fileDescriptorOwned(fd)) + return .init(backing: .fileDescriptorOwned(fd)) } /// Install `fd` as the child process' file descriptor, leaving the fd ownership with the user. @@ -76,7 +95,7 @@ public struct ProcessOutput: Sendable { /// /// - note: `fd` is required to be closed by the user after the process has started running (and _not_ before). public static func fileDescriptor(sharing fd: FileDescriptor) -> Self { - .init(backing: .fileDescriptorShared(fd)) + return .init(backing: .fileDescriptorShared(fd)) } /// Stream this using the ``ProcessExecutor.standardOutput`` / ``ProcessExecutor.standardError`` ``AsyncStream``s. @@ -101,19 +120,19 @@ private struct OutputConsumptionState: OptionSet { static let stderrNotStreamed: Self = .init(rawValue: 0b1000) var hasStandardOutputBeenConsumed: Bool { - self.contains([.stdoutConsumed]) + return self.contains([.stdoutConsumed]) } var hasStandardErrorBeenConsumed: Bool { - self.contains([.stderrConsumed]) + return self.contains([.stderrConsumed]) } var isStandardOutputStremed: Bool { - !self.contains([.stdoutNotStreamed]) + return !self.contains([.stdoutNotStreamed]) } var isStandardErrorStremed: Bool { - !self.contains([.stderrNotStreamed]) + return !self.contains([.stderrNotStreamed]) } } @@ -158,14 +177,47 @@ public final actor ProcessExecutor { private let standardErrorWriteHandle: FileHandle? private let _standardOutput: ChunkSequence private let _standardError: ChunkSequence - private let processIsRunningApproximation = ManagedAtomic( - RunningStateApproximation.neverStarted.rawValue - ) + private let processIsRunningApproximation = ManagedAtomic(RunningStateApproximation.neverStarted.rawValue) private let processOutputConsumptionApproximation = ManagedAtomic(UInt8(0)) private let processPid = ManagedAtomic(pid_t(0)) private let ownsStandardOutputWriteHandle: Bool private let ownsStandardErrorWriteHandle: Bool private let teardownSequence: TeardownSequence + private let spawnOptions: SpawnOptions + + public static var isBackedByPSProcess: Bool { + return Process.self == PSProcess.self + } + + public struct SpawnOptions: Sendable { + /// Should we close all non-stdin/out/err file descriptors in the child? + /// + /// The default and safe option is `true` but on Linux this incurs a performance penalty unless you have + /// a new-enough Glibc & Linux that support the + /// [`close_range`](https://man7.org/linux/man-pages/man2/close_range.2.html) syscall. + /// + /// On Darwin, `false` is only supported if you compile with `-Xswiftc -DASYNC_PROCESS_FORCE_PS_PROCESS`, + /// otherwise it will be silently ignored (and the other file descriptors will be closed anyway.). + public var closeOtherFileDescriptors: Bool + + /// Change the working directory of the child process to this directory. + public var changedWorkingDirectory: Optional + + /// Should we call `setsid()` in the child process? + /// + /// Not supported on Darwin, unless you compile with `-Xswiftc -DASYNC_PROCESS_FORCE_PS_PROCESS`, otherwise + /// it will be silently ignored (and no new session will be created). + public var createNewSession: Bool + + /// Safe & sensible default options. + public static var `default`: SpawnOptions { + return SpawnOptions( + closeOtherFileDescriptors: true, + changedWorkingDirectory: nil, + createNewSession: false + ) + } + } public struct OSError: Error & Sendable & Hashable { public var errnoNumber: CInt @@ -179,13 +231,13 @@ public final actor ProcessExecutor { public typealias ArrayLiteralElement = TeardownStep public init(arrayLiteral elements: TeardownStep...) { - self.steps = (elements.map(\.backing)) + [.kill] + self.steps = (elements.map { $0.backing }) + [.kill] } public struct TeardownStep: Sendable { var backing: Backing - enum Backing { + internal enum Backing { case sendSignal(CInt, allowedTimeNS: UInt64) case kill } @@ -193,14 +245,13 @@ public final actor ProcessExecutor { /// Send `signal` to process and give it `allowedTimeToExitNS` nanoseconds to exit before progressing /// to the next teardown step. The final teardown step is always sending a `SIGKILL`. public static func sendSignal(_ signal: CInt, allowedTimeToExitNS: UInt64) -> Self { - Self(backing: .sendSignal(signal, allowedTimeNS: allowedTimeToExitNS)) + return Self(backing: .sendSignal(signal, allowedTimeNS: allowedTimeToExitNS)) } } - var steps: [TeardownStep.Backing] = [.kill] public var description: String { - self.steps.map { "\($0)" }.joined(separator: ", ") + return self.steps.map { "\($0)" }.joined(separator: ", ") } } @@ -243,16 +294,12 @@ public final actor ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` - /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you - /// don't want to + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you don't want to /// provide input. - /// - standardOutput: A description of what to do with the standard output of the child process (defaults to - /// ``ProcessOutput/stream`` + /// - standardOutput: A description of what to do with the standard output of the child process (defaults to ``ProcessOutput/stream`` /// which requires to consume it via ``ProcessExecutor/standardOutput``. - /// - standardError: A description of what to do with the standard output of the child process (defaults to - /// ``ProcessOutput/stream`` + /// - standardError: A description of what to do with the standard output of the child process (defaults to ``ProcessOutput/stream`` /// which requires to consume it via ``ProcessExecutor/standardError``. /// - teardownSequence: What to do if ``ProcessExecutor`` needs to tear down the process abruptly /// (usually because of Swift Concurrency cancellation) @@ -262,6 +309,7 @@ public final actor ProcessExecutor { executable: String, _ arguments: [String], environment: [String: String] = [:], + spawnOptions: SpawnOptions = .default, standardInput: StandardInput, standardOutput: ProcessOutput = .stream, standardError: ProcessOutput = .stream, @@ -275,6 +323,7 @@ public final actor ProcessExecutor { self.standardInput = AnyAsyncSequence(standardInput) self.logger = logger self.teardownSequence = teardownSequence + self.spawnOptions = spawnOptions self.standardInputPipe = StandardInput.self == EOFSequence.self ? nil : Pipe() @@ -287,7 +336,7 @@ public final actor ProcessExecutor { self.ownsStandardOutputWriteHandle = true self.standardOutputWriteHandle = FileHandle(forWritingAtPath: "/dev/null") self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) - case let .fileDescriptorOwned(fd): + case .fileDescriptorOwned(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed @@ -295,7 +344,7 @@ public final actor ProcessExecutor { self.ownsStandardOutputWriteHandle = true self.standardOutputWriteHandle = FileHandle(fileDescriptor: fd.rawValue) self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) - case let .fileDescriptorShared(fd): + case .fileDescriptorShared(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed @@ -327,7 +376,7 @@ public final actor ProcessExecutor { self.ownsStandardErrorWriteHandle = true self.standardErrorWriteHandle = FileHandle(forWritingAtPath: "/dev/null") self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) - case let .fileDescriptorOwned(fd): + case .fileDescriptorOwned(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed @@ -335,7 +384,7 @@ public final actor ProcessExecutor { self.ownsStandardErrorWriteHandle = true self.standardErrorWriteHandle = FileHandle(fileDescriptor: fd.rawValue) self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) - case let .fileDescriptorShared(fd): + case .fileDescriptorShared(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed @@ -434,7 +483,7 @@ public final actor ProcessExecutor { } let stepCompletion: TeardownStepCompletion switch step { - case let .sendSignal(signal, allowedTimeNS): + case .sendSignal(let signal, let allowedTimeNS): stepCompletion = await withTaskGroup(of: TeardownStepCompletion.self) { group in group.addTask { do { @@ -473,12 +522,10 @@ public final actor ProcessExecutor { /// /// Calling `run()` will run the (sub-)process and return its ``ProcessExitReason`` when the execution completes. /// Unless `standardOutput` and `standardError` were both set to ``ProcessOutput/discard``, - /// ``ProcessOutput/fileDescriptor(takingOwnershipOf:)`` or ``ProcessOutput/inherit`` you must consume the - /// `AsyncSequence`s + /// ``ProcessOutput/fileDescriptor(takingOwnershipOf:)`` or ``ProcessOutput/inherit`` you must consume the `AsyncSequence`s /// ``ProcessExecutor/standardOutput`` and ``ProcessExecutor/standardError`` concurrently to ``run()``ing the process. /// - /// If you prefer to get the standard output and error in one (non-stremed) piece upon exit, consider the `static` - /// methods such as + /// If you prefer to get the standard output and error in one (non-stremed) piece upon exit, consider the `static` methods such as /// ``ProcessExecutor/runCollectingOutput(group:executable:_:standardInput:collectStandardOutput:collectStandardError:perStreamCollectionLimitBytes:environment:logger:)``. public func run() async throws -> ProcessExitReason { let p = Process() @@ -494,6 +541,19 @@ public final actor ProcessExecutor { p.arguments = self.arguments p.environment = self.environment p.standardInput = nil + func isTypeOf(_ existing: Existing, type: New.Type) -> New? { + return existing as? New + } + if let newCWD = self.spawnOptions.changedWorkingDirectory { + p.currentDirectoryURL = URL.init(fileURLWithPath: newCWD) + } + if let pSpecial = isTypeOf(p, type: PSProcess.self) { + assert(Self.isBackedByPSProcess) + pSpecial._closeOtherFileDescriptors = self.spawnOptions.closeOtherFileDescriptors + pSpecial._createNewSession = self.spawnOptions.createNewSession + } else { + assert(!Self.isBackedByPSProcess) + } if let standardOutputWriteHandle = self.standardOutputWriteHandle { // NOTE: Do _NOT_ remove this if. Setting this to `nil` is different to not setting it at all! @@ -510,12 +570,22 @@ public final actor ProcessExecutor { ) p.terminationHandler = { p in - let pidExchangeWorked = self.processPid.compareExchange( - expected: p.processIdentifier, - desired: -1, - ordering: .sequentiallyConsistent - ).exchanged - assert(pidExchangeWorked) + let pProcessID = p.processIdentifier + var terminationPidExchange: (exchanged: Bool, original: pid_t) = (false, -1) + while !terminationPidExchange.exchanged { + terminationPidExchange = self.processPid.compareExchange( + expected: pProcessID, + desired: -1, + ordering: .sequentiallyConsistent + ) + if !terminationPidExchange.exchanged { + precondition( + terminationPidExchange.original == 0, + "termination pid exchange failed: \(terminationPidExchange)" + ) + Thread.sleep(forTimeInterval: 0.01) + } + } self.logger.debug( "finished running command", metadata: [ @@ -552,34 +622,46 @@ public final actor ProcessExecutor { worked, "Did you run() twice? That's currently not allowed: illegal running state \(original)" ) - do { - try p.run() - } catch { - let (worked, original) = self.processIsRunningApproximation.compareExchange( - expected: RunningStateApproximation.running.rawValue, - desired: RunningStateApproximation.finishedExecuting.rawValue, - ordering: .relaxed - ) - terminationStreamProducer.finish() // The termination handler will never have fired. - assert(worked) // We just set it to running above, shouldn't be able to race (no `await`). - assert(original == RunningStateApproximation.running.rawValue) // We compare-and-exchange it. - throw error + let childPid: pid_t = try await withCheckedThrowingContinuation { continuation in + DispatchQueue.global().async { + do { + try p.run() + let childPid = p.processIdentifier + assert(childPid > 0) + continuation.resume(returning: childPid) + } catch { + let (worked, original) = self.processIsRunningApproximation.compareExchange( + expected: RunningStateApproximation.running.rawValue, + desired: RunningStateApproximation.finishedExecuting.rawValue, + ordering: .relaxed + ) + terminationStreamProducer.finish() // The termination handler will never have fired. + if self.ownsStandardOutputWriteHandle { + try! self.standardOutputWriteHandle?.close() + } + if self.ownsStandardErrorWriteHandle { + try! self.standardErrorWriteHandle?.close() + } + assert(worked) // We just set it to running above, shouldn't be able to race (no `await`). + assert(original == RunningStateApproximation.running.rawValue) // We compare-and-exchange it. + continuation.resume(throwing: error) + } + } } // At this point, the process is running, we should therefore have a process ID (unless we're already dead). - let childPid = p.processIdentifier - _ = self.processPid.compareExchange( + let runPidExchange = self.processPid.compareExchange( expected: 0, desired: childPid, ordering: .sequentiallyConsistent ) - assert(childPid != 0 || !p.isRunning) + precondition(runPidExchange.exchanged, "run pid exchange failed: \(runPidExchange)") self.logger.debug( "running command", metadata: [ "executable": "\(self.executable)", "arguments": "\(self.arguments)", - "pid": "\(p.processIdentifier)", + "pid": "\(childPid)", ] ) @@ -591,13 +673,12 @@ public final actor ProcessExecutor { try! self.standardErrorWriteHandle?.close() // Must work. } - @Sendable - func waitForChildToExit() async -> ProcessExitReason { + @Sendable func waitForChildToExit() async -> ProcessExitReason { // Please note, we're invoking this function multiple times concurrently, so we're relying on AsyncStream // supporting this. // We do need for the child to exit (and it will, we'll eventually SIGKILL it) - await withUncancelledTask(returning: ProcessExitReason.self) { + return await withUncancelledTask(returning: ProcessExitReason.self) { var iterator = terminationStreamConsumer.makeAsyncIterator() // Let's wait for the process to finish (it will) @@ -616,11 +697,7 @@ public final actor ProcessExecutor { await withTaskGroup(of: Void.self) { triggerTeardownGroup in triggerTeardownGroup.addTask { // wait until cancelled - do { - while true { - try await Task.sleep(nanoseconds: 1_000_000_000) - } - } catch {} + do { while true { try await Task.sleep(nanoseconds: 1_000_000_000) } } catch {} let isRunning = self.processIsRunningApproximation.load(ordering: .relaxed) guard isRunning != RunningStateApproximation.finishedExecuting.rawValue else { @@ -665,8 +742,9 @@ public final actor ProcessExecutor { } var exitReason: ProcessExitReason? = nil + // cannot fix this warning yet (rdar://113844171) while let result = try await runProcessGroup.next() { - if let result { + if let result = result { exitReason = result } } @@ -674,9 +752,21 @@ public final actor ProcessExecutor { } } - public func sendSignal(_ signal: CInt) async throws { + /// The processes's process identifier (pid). Please note that most use cases of this are racy because UNIX systems recycle pids after process exit. + /// + /// Best effort way to return the process identifier whilst the process is running and `nil` when it's not running. + /// This may however return the process identifier for some time after the process has already exited. + public nonisolated var bestEffortProcessIdentifier: pid_t? { let pid = self.processPid.load(ordering: .sequentiallyConsistent) - if pid == 0 || pid == -1 { + guard pid > 0 else { + assert(pid == 0 || pid == -1) // we never assign other values + return nil + } + return pid + } + + public func sendSignal(_ signal: CInt) async throws { + guard let pid = self.bestEffortProcessIdentifier else { throw OSError(errnoNumber: ESRCH, function: "sendSignal") } let ret = kill(pid, signal) @@ -691,12 +781,12 @@ extension ProcessExecutor { /// /// At present this is always `MultiThreadedEventLoopGroup.singleton`. public static var defaultEventLoopGroup: any EventLoopGroup { - globalDefaultEventLoopGroup + return globalDefaultEventLoopGroup } /// The default `Logger` for ``ProcessExecutor`` that's used if you do not override it. It won't log anything. public static var disableLogging: Logger { - globalDisableLoggingLogger + return globalDisableLoggingLogger } } @@ -710,13 +800,10 @@ extension ProcessExecutor { /// - executable: The full path to the executable to spawn /// - arguments: The arguments to the executable (not including `argv[0]`) /// - environment: The environment variables to pass to the child process. - /// If you want to inherit the calling process' environment into the child, specify - /// `ProcessInfo.processInfo.environment` - /// - standardOutput: A description of what to do with the standard output of the child process (defaults to - /// ``ProcessOutput/stream`` + /// If you want to inherit the calling process' environment into the child, specify `ProcessInfo.processInfo.environment` + /// - standardOutput: A description of what to do with the standard output of the child process (defaults to ``ProcessOutput/stream`` /// which requires to consume it via ``ProcessExecutor/standardOutput``. - /// - standardError: A description of what to do with the standard output of the child process (defaults to - /// ``ProcessOutput/stream`` + /// - standardError: A description of what to do with the standard output of the child process (defaults to ``ProcessOutput/stream`` /// which requires to consume it via ``ProcessExecutor/standardError``. /// - logger: Where to log diagnostic messages to (default to no where) public init( @@ -724,6 +811,7 @@ extension ProcessExecutor { executable: String, _ arguments: [String], environment: [String: String] = [:], + spawnOptions: SpawnOptions = .default, standardOutput: ProcessOutput = .stream, standardError: ProcessOutput = .stream, teardownSequence: TeardownSequence = TeardownSequence(), @@ -734,6 +822,7 @@ extension ProcessExecutor { executable: executable, arguments, environment: environment, + spawnOptions: spawnOptions, standardInput: EOFSequence(), standardOutput: standardOutput, standardError: standardError, @@ -744,10 +833,9 @@ extension ProcessExecutor { } private let globalDefaultEventLoopGroup: MultiThreadedEventLoopGroup = .singleton -private let globalDisableLoggingLogger: Logger = .init( - label: "swift-async-process -- never logs", - factory: { _ in SwiftLogNoOpLogHandler() } -) +private let globalDisableLoggingLogger: Logger = { + return Logger(label: "swift-async-process -- never logs", factory: { _ in SwiftLogNoOpLogHandler() }) +}() extension AsyncStream { static func justMakeIt(elementType: Element.Type = Element.self) -> ( diff --git a/Sources/AsyncProcess/ProcessExit.swift b/Sources/AsyncProcess/ProcessExit.swift index b79104b..6e88f78 100644 --- a/Sources/AsyncProcess/ProcessExit.swift +++ b/Sources/AsyncProcess/ProcessExit.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -24,6 +24,34 @@ public enum ProcessExitReason: Hashable & Sendable { } } +extension ProcessExitReason { + /// Turn into an integer like `$?` works in shells. + /// + /// Concretely, this means if the program exits normally with exit code `N`, `asShellExitCode == N`. But if the program exits because of a signal, then + /// `asShellExitCode == N + 128`, so 128 gets added to the signal number. + public var asShellExitCode: Int { + switch self { + case .exit(let code): + return Int(code) + case .signal(let code): + return 128 + Int(code) + } + } + + /// Turn into an integer like Python's subprocess does. + /// + /// Concretely, this means if the program exits normally with exit code `N`, `asShellExitCode == N`. But if the program exits because of a signal, then + /// `asShellExitCode == -N`, so the negative signal number gets returned. + public var asPythonExitCode: Int { + switch self { + case .exit(let code): + return Int(code) + case .signal(let code): + return -Int(code) + } + } +} + public struct ProcessExecutionError: Error & Hashable & Sendable { public var exitReason: ProcessExitReason @@ -34,6 +62,6 @@ public struct ProcessExecutionError: Error & Hashable & Sendable { extension ProcessExecutionError: CustomStringConvertible { public var description: String { - "process exited non-zero: \(self.exitReason)" + return "process exited non-zero: \(self.exitReason)" } } diff --git a/Sources/CProcessSpawnSync/include/CProcessSpawnSync.h b/Sources/CProcessSpawnSync/include/CProcessSpawnSync.h new file mode 100644 index 0000000..58ec43c --- /dev/null +++ b/Sources/CProcessSpawnSync/include/CProcessSpawnSync.h @@ -0,0 +1,13 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#include "ps-api.h" diff --git a/Sources/CProcessSpawnSync/include/ps-api.h b/Sources/CProcessSpawnSync/include/ps-api.h new file mode 100644 index 0000000..271e6f3 --- /dev/null +++ b/Sources/CProcessSpawnSync/include/ps-api.h @@ -0,0 +1,73 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#ifndef PS_API_H +#define PS_API_H + +#include +#include + +typedef enum ps_error_kind_s { + PS_ERROR_KIND_EXECVE = 1, + PS_ERROR_KIND_PIPE = 2, + PS_ERROR_KIND_FCNTL = 3, + PS_ERROR_KIND_SIGNAL = 4, + PS_ERROR_KIND_SIGPROC_MASK = 5, + PS_ERROR_KIND_CHDIR = 6, + PS_ERROR_KIND_SETSID = 7, + PS_ERROR_KIND_DUP2 = 8, + PS_ERROR_KIND_READ_FROM_CHILD = 9, + PS_ERROR_KIND_DUP = 10, + PS_ERROR_KIND_SIGMASK_THREAD = 11, + PS_ERROR_KIND_FAILED_CHILD_WAITPID = 12, +} ps_error_kind; + +typedef struct ps_error_s { + ps_error_kind pse_kind; + int pse_code; + const char *pse_file; + int pse_line; + int pse_extra_info; +} ps_error; + +typedef enum ps_fd_setup_kind_s { + PS_MAP_FD = 1, + PS_CLOSE_FD = 2, +} ps_fd_setup_kind; + +typedef struct ps_fd_setup_s { + ps_fd_setup_kind psfd_kind; + int psfd_parent_fd; +} ps_fd_setup; + +typedef struct ps_process_configuration_s { + const char *psc_path; + + // including argv[0] + char **psc_argv; + + char **psc_env; + + const char *psc_cwd; + + int psc_fd_setup_count; + const ps_fd_setup *psc_fd_setup_instructions; + + bool psc_new_session; + bool psc_close_other_fds; +} ps_process_configuration; + +pid_t ps_spawn_process(ps_process_configuration *config, ps_error *out_error); + +void ps_convert_exit_status(int in_status, bool *out_has_exited, bool *out_is_exit_code, int *out_code); + +#endif diff --git a/Sources/CProcessSpawnSync/internal-helpers.h b/Sources/CProcessSpawnSync/internal-helpers.h new file mode 100644 index 0000000..b57f0f6 --- /dev/null +++ b/Sources/CProcessSpawnSync/internal-helpers.h @@ -0,0 +1,87 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#ifndef INTERNAL_HELPERS_H +#define INTERNAL_HELPERS_H +#include + +static int positive_int_parse(const char *str) { + int out = 0; + char c = 0; + + while ((c = *str++) != 0) { + out *= 10; + if (c >= '0' && c <= '9') { + out += c - '0'; + } else { + return -1; + } + } + return out; +} + +static int highest_possibly_open_fd_dir(const char *fd_dir) { + int highest_fd_so_far = 0; + DIR *dir_ptr = opendir(fd_dir); + if (dir_ptr == NULL) { + return -1; + } + + struct dirent *dir_entry = NULL; + while ((dir_entry = readdir(dir_ptr)) != NULL) { + char *entry_name = dir_entry->d_name; + int number = positive_int_parse(entry_name); + if (number > (long)highest_fd_so_far) { + highest_fd_so_far = number; + } + } + + closedir(dir_ptr); + return highest_fd_so_far; +} + +static int highest_possibly_open_fd(void) { +#if defined(__APPLE__) + int hi = highest_possibly_open_fd_dir("/dev/fd"); + if (hi < 0) { + hi = getdtablesize(); + } +#elif defined(__linux__) + int hi = highest_possibly_open_fd_dir("/proc/self/fd"); + if (hi < 0) { + hi = getdtablesize(); + } +#else + int hi = 1024; +#endif + + return hi; +} + +static int block_everything_but_something_went_seriously_wrong_signals(sigset_t *old_mask) { + sigset_t mask; + int r = 0; + r |= sigfillset(&mask); + r |= sigdelset(&mask, SIGABRT); + r |= sigdelset(&mask, SIGBUS); + r |= sigdelset(&mask, SIGFPE); + r |= sigdelset(&mask, SIGILL); + r |= sigdelset(&mask, SIGKILL); + r |= sigdelset(&mask, SIGSEGV); + r |= sigdelset(&mask, SIGSTOP); + r |= sigdelset(&mask, SIGSYS); + r |= sigdelset(&mask, SIGTRAP); + + r |= pthread_sigmask(SIG_BLOCK, &mask, old_mask); + return r; +} +#endif diff --git a/Sources/CProcessSpawnSync/spawner.c b/Sources/CProcessSpawnSync/spawner.c new file mode 100644 index 0000000..315bb75 --- /dev/null +++ b/Sources/CProcessSpawnSync/spawner.c @@ -0,0 +1,369 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#if __has_include() +#include +#endif +#include + +#include "internal-helpers.h" + +#if __has_include() +int close_range(unsigned int first, unsigned int last, int flags); +#endif + +#define MAKE_PS_ERROR_FROM_ERRNO(__kind) \ +(ps_error){ \ + .pse_kind = (__kind), \ + .pse_code = errno, \ + .pse_file = __FILE__, \ + .pse_line = __LINE__ \ +} + +#if __apple__ +# define PS_SIG_MAX __DARWIN_NSIG +#else +# define PS_SIG_MAX 32 +#endif + +#define ps_precondition(__cond) do { \ + int eval = (__cond); \ + if (!eval) { \ + __builtin_trap(); \ + } \ +} while(0) + +#if defined(NDEBUG) +# define ps_assert(__cond) do { } while(0) +#else +# define ps_assert(__cond) do { \ + int eval = (__cond); \ + if (!eval) { \ + __builtin_trap(); \ + } \ +} while(0) +#endif + +/// Lock just around the `fork` to protect the signal masking +static pthread_mutex_t g_fork_lock = PTHREAD_MUTEX_INITIALIZER; + +struct child_scratch { + int duplicated_fd; +}; + +static void setup_and_execve_child(ps_process_configuration *config, int error_pipe, struct child_scratch *scratch) { + ps_error error = { 0 }; + sigset_t sigset = { 0 }; + int err = -1; + + /* reset signal handlers */ + for (int signo = 1; signo < PS_SIG_MAX; signo++) { + if (signo == SIGKILL || signo == SIGSTOP) { + continue; + } + void (*err_ptr)(int) = signal(signo, SIG_DFL); + if (err_ptr != SIG_ERR) { + continue; + } + + if (errno == EINVAL) { + break; // probably too high of a signal + } + + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGNAL); + error.pse_extra_info = signo; + goto write_fail; + } + + /* reset signal mask */ + sigemptyset(&sigset); + err = sigprocmask(SIG_SETMASK, &sigset, NULL) != 0; + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGPROC_MASK); + goto write_fail; + } + + if (config->psc_new_session) { + err = setsid(); + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SETSID); + goto write_fail; + } + } + + for (int child_fd=0; child_fdpsc_fd_setup_count; child_fd++) { + ps_fd_setup setup = config->psc_fd_setup_instructions[child_fd]; + + switch (setup.psfd_kind) { + case PS_MAP_FD: + scratch[child_fd].duplicated_fd = fcntl(setup.psfd_parent_fd, F_DUPFD_CLOEXEC, config->psc_fd_setup_count); + if (scratch[child_fd].duplicated_fd == -1) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_DUP); + error.pse_extra_info = child_fd; + goto write_fail; + } + break; + case PS_CLOSE_FD: + scratch[child_fd].duplicated_fd = -1; + break; + default: + ps_precondition(0); + } + } + + for (int child_fd=0; child_fdpsc_fd_setup_count; child_fd++) { + ps_fd_setup setup = config->psc_fd_setup_instructions[child_fd]; + switch (setup.psfd_kind) { + case PS_MAP_FD: + ps_precondition(scratch[child_fd].duplicated_fd > child_fd); + err = dup2(scratch[child_fd].duplicated_fd, child_fd); + if (err == -1) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_DUP2); + error.pse_extra_info = child_fd; + goto write_fail; + } + break; + case PS_CLOSE_FD: + ps_precondition(scratch[child_fd].duplicated_fd == -1); + close(child_fd); + break; + default: + ps_precondition(0); + } + } + + if (config->psc_close_other_fds) { + int close_range_err = -1; + errno = ENOSYS; +#if __has_include() + if (error_pipe > config->psc_fd_setup_count) { + // We mustn't close `error_pipe`. + close_range_err = close_range(config->psc_fd_setup_count, error_pipe - 1, 0); + close_range_err |= close_range(error_pipe + 1, ~0U, 0); + } else { + int from = config->psc_fd_setup_count == error_pipe ? error_pipe + 1 : config->psc_fd_setup_count; + close_range_err = close_range(from, ~0U, 0); + } +#endif + if (close_range_err) { + // close_range failed (or doesn't exist), let's fall back onto this + for (int i=config->psc_fd_setup_count; ipsc_cwd) { + err = chdir(config->psc_cwd); + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_CHDIR); + goto write_fail; + } + } + + /* finally, exec */ + err = execve(config->psc_path, config->psc_argv, config->psc_env); + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_EXECVE); + goto write_fail; + } + + __builtin_unreachable(); + + write_fail: + write(error_pipe, &error, sizeof(error)); + close(error_pipe); + _exit(253); +} + +pid_t ps_spawn_process(ps_process_configuration *config, ps_error *out_error) { + pid_t pid = -1; + sigset_t old_sigmask; + struct child_scratch *scratch = NULL; + int error_pid_fd[2] = { -1, -1 }; + int err = pipe(error_pid_fd); + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_PIPE); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + err = fcntl(error_pid_fd[0], F_SETFD, FD_CLOEXEC); + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_FCNTL); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + err = fcntl(error_pid_fd[1], F_SETFD, FD_CLOEXEC); + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_FCNTL); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + scratch = calloc(config->psc_fd_setup_count, sizeof(*scratch)); + + // We need to protect the signal masking below (we unlock this in the parent only, child's gonna execve anyway). + err = pthread_mutex_lock(&g_fork_lock); + ps_precondition(err == 0); + + /* block all signals on this thread, don't want things to go wrong post-fork, pre-execve */ + err = block_everything_but_something_went_seriously_wrong_signals(&old_sigmask); + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGMASK_THREAD); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + if (( +#if defined(__linux__) || ASYNC_PROCESS_FORCE_VFORK + pid = vfork() +#else + pid = fork() +#endif + ) == 0) { + /* child */ + setup_and_execve_child(config, error_pid_fd[1], scratch); + exit(254); + } else { + /* parent */ + err = pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL); /* restore old sigmask */ + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGMASK_THREAD); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + err = pthread_mutex_unlock(&g_fork_lock); + ps_precondition(err == 0); + + if (pid > 0) { + ps_error child_error = { 0 }; + close(error_pid_fd[1]); + error_pid_fd[1] = -1; + + free(scratch); + scratch = NULL; + + while (true) { + ssize_t read_res = read(error_pid_fd[0], &child_error, sizeof(child_error)); + if (read_res == 0) { + /* EOF, that's good, execve worked. */ + close(error_pid_fd[0]); + error_pid_fd[0] = -1; + return pid; + } else if (read_res > 0) { + ps_precondition(read_res == sizeof(child_error)); + if (out_error) { + *out_error = child_error; + } + goto error_cleanup; + if (errno == EINTR) { + continue; + } else { + ps_assert(0); + /* This is very bad (and unexpected), we forked a child but don't know its whereabouts */ + kill(pid, SIGKILL); /* last ditch attempt to terminate the child process just in case it survived */ + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_READ_FROM_CHILD); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + } + } + } else { + pid = 0; /* nothing to waitpid on */ + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_FCNTL); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + } + +error_cleanup: + if (pid > 0) { + while (true) { + int wp_result = waitpid(pid, NULL, 0); + if (wp_result > 0) { + /* success */ + break; + } else if (err == -1) { + if (errno == EINTR) { + continue; + } else { + /* bad & unexpected */ + ps_assert(0); + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_FAILED_CHILD_WAITPID); + if (out_error) { + *out_error = error; + } + break; + } + } else { + ps_precondition(0); + } + } + } + if (error_pid_fd[0] != -1) { + close(error_pid_fd[0]); + } + if (error_pid_fd[1] != -1) { + close(error_pid_fd[1]); + } + free(scratch); + ps_precondition((!out_error) || (out_error->pse_kind != 0)); + return 0; +} + +void ps_convert_exit_status(int in_status, bool *out_has_exited, bool *out_is_exit_code, int *out_code) { + if (WIFEXITED(in_status)) { + *out_has_exited = true; + *out_is_exit_code = true; + *out_code = WEXITSTATUS(in_status); + } else if (WIFSIGNALED(in_status)) { + *out_has_exited = true; + *out_is_exit_code = false; + *out_code = WTERMSIG(in_status); + } else { + *out_has_exited = false; + *out_is_exit_code = false; + *out_code = -1; + } +} diff --git a/Sources/ProcessSpawnSync/ProcessSpawner.swift b/Sources/ProcessSpawnSync/ProcessSpawner.swift new file mode 100644 index 0000000..cabcc6b --- /dev/null +++ b/Sources/ProcessSpawnSync/ProcessSpawner.swift @@ -0,0 +1,414 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Atomics +import CProcessSpawnSync +import NIOConcurrencyHelpers + +#if os(iOS) || os(tvOS) || os(watchOS) + // Note: Process in iOS/tvOS/watchOS is available in internal builds only under Foundation Private/headers + import Foundation_Private.NSTask +#else + import Foundation +#endif + +extension ps_error_s { + private func makeDescription() -> String { + return """ + PSError(\ + kind: \(self.pse_kind.rawValue), \ + errno: \(self.pse_code), \ + file: \(String(cString: self.pse_file)), \ + line: \(self.pse_line)\ + \(self.pse_extra_info != 0 ? ", extra: \(self.pse_extra_info)" : "" + ) + """ + } +} + +#if compiler(>=6.0) + extension ps_error_s: @retroactive CustomStringConvertible { + public var description: String { + return self.makeDescription() + } + } +#else + extension ps_error_s: CustomStringConvertible { + public var description: String { + return self.makeDescription() + } + } +#endif + +public struct PSProcessUnknownError: Error & CustomStringConvertible { + var reason: String + + public var description: String { + return self.reason + } +} + +public final class PSProcess: Sendable { + struct State: Sendable { + var executableURL: URL? = nil + var arguments: [String] = [] + var environment: [String: String] = [:] + var currentDirectoryURL: URL? = nil + var closeOtherFileDescriptors: Bool = true + var createNewSession: Bool = false + private(set) var pidWhenRunning: pid_t? = nil + var standardInput: Pipe? = nil + var standardOutput: FileHandle? = nil + var standardError: FileHandle? = nil + var terminationHandler: (@Sendable (PSProcess) -> Void)? = nil + private(set) var procecesIdentifier: pid_t? = nil + private(set) var terminationStatus: (Process.TerminationReason, CInt)? = nil + + mutating func setRunning(pid: pid_t, isRunningApproximation: ManagedAtomic) { + assert(self.pidWhenRunning == nil) + self.pidWhenRunning = pid + self.procecesIdentifier = pid + isRunningApproximation.store(true, ordering: .relaxed) + } + + mutating func setNotRunning( + terminationStaus: (Process.TerminationReason, CInt), + isRunningApproximation: ManagedAtomic + ) -> @Sendable (PSProcess) -> Void { + assert(self.pidWhenRunning != nil) + isRunningApproximation.store(false, ordering: .relaxed) + self.pidWhenRunning = nil + self.terminationStatus = terminationStaus + let terminationHandler = self.terminationHandler ?? { _ in } + self.terminationHandler = nil + return terminationHandler + } + } + + let state = NIOLockedValueBox(State()) + let isRunningApproximation = ManagedAtomic(false) + + public init() {} + + public func run() throws { + let state = self.state.withLockedValue { $0 } + + guard let pathString = state.executableURL?.path.removingPercentEncoding else { + throw PSProcessUnknownError(reason: "executableURL is nil") + } + let cwdString = state.currentDirectoryURL?.path.removingPercentEncoding + let path = copyOwnedCTypedString(pathString) + defer { + path.deallocate() + } + let cwd = cwdString.map { copyOwnedCTypedString($0) } + defer { + cwd?.deallocate() + } + let args = copyOwnedCTypedStringArray([pathString] + state.arguments) + defer { + var index = 0 + var arg = args[index] + while arg != nil { + arg!.deallocate() + index += 1 + arg = args[index] + } + } + let envs = copyOwnedCTypedStringArray((state.environment.map { k, v in "\(k)=\(v)" })) + defer { + var index = 0 + var env = envs[index] + while env != nil { + env!.deallocate() + index += 1 + env = envs[index] + } + } + + let psSetup: [ps_fd_setup] = [ + ps_fd_setup( + psfd_kind: PS_MAP_FD, + psfd_parent_fd: state.standardInput?.fileHandleForReading.fileDescriptor ?? STDIN_FILENO + ), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: state.standardOutput?.fileDescriptor ?? STDOUT_FILENO), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: state.standardError?.fileDescriptor ?? STDERR_FILENO), + ] + let (pid, error) = psSetup.withUnsafeBufferPointer { psSetupPtr -> (pid_t, ps_error) in + var config = ps_process_configuration_s( + psc_path: path, + psc_argv: args, + psc_env: envs, + psc_cwd: cwd, + psc_fd_setup_count: CInt(psSetupPtr.count), + psc_fd_setup_instructions: psSetupPtr.baseAddress!, + psc_new_session: state.createNewSession, + psc_close_other_fds: state.closeOtherFileDescriptors + ) + var error = ps_error() + let pid = ps_spawn_process(&config, &error) + return (pid, error) + } + try! state.standardInput?.fileHandleForReading.close() + guard pid > 0 else { + switch (error.pse_kind, error.pse_code) { + case (PS_ERROR_KIND_EXECVE, ENOENT), + (PS_ERROR_KIND_EXECVE, ENOTDIR), + (PS_ERROR_KIND_CHDIR, ENOENT), + (PS_ERROR_KIND_CHDIR, ENOTDIR): + throw NSError( + domain: NSCocoaErrorDomain, + code: NSFileNoSuchFileError, + userInfo: ["underlying-error": "\(error)"] + ) + default: + throw PSProcessUnknownError(reason: "\(error)") + } + } + self.state.withLockedValue { state in + state.setRunning(pid: pid, isRunningApproximation: self.isRunningApproximation) + } + + let q = DispatchQueue(label: "q") + let source = DispatchSource.makeSignalSource(signal: SIGCHLD, queue: q) + source.setEventHandler { + if let terminationHandler = self.terminationHandlerFinishedRunning() { + source.cancel() + terminationHandler(self) + } + } + source.setRegistrationHandler { + if let terminationHandler = self.terminationHandlerFinishedRunning() { + source.cancel() + q.async { + terminationHandler(self) + } + } + } + source.resume() + } + + public var processIdentifier: pid_t { + return self.state.withLockedValue { state in + return state.procecesIdentifier! + } + } + + public var terminationReason: Process.TerminationReason { + return self.state.withLockedValue { state in + state.terminationStatus!.0 + } + } + + public var terminationStatus: CInt { + return self.state.withLockedValue { state in + state.terminationStatus!.1 + } + } + + public var isRunning: Bool { + return self.isRunningApproximation.load(ordering: .relaxed) + } + + internal func terminationHandlerFinishedRunning() -> (@Sendable (PSProcess) -> Void)? { + return self.state.withLockedValue { state -> (@Sendable (PSProcess) -> Void)? in + guard let pid = state.pidWhenRunning else { + return nil + } + var status: CInt = -1 + while true { + let err = waitpid(pid, &status, WNOHANG) + if err == -1 { + if errno == EINTR { + continue + } else { + preconditionFailure("waitpid failed with \(errno)") + } + } else { + var hasExited = false + var isExitCode = false + var code: CInt = 0 + ps_convert_exit_status(status, &hasExited, &isExitCode, &code) + if hasExited { + return state.setNotRunning( + terminationStaus: (isExitCode ? .exit : .uncaughtSignal, code), + isRunningApproximation: self.isRunningApproximation + ) + } else { + return nil + } + } + } + } + } + + public var executableURL: URL? { + get { + self.state.withLockedValue { state in + state.executableURL + } + } + set { + self.state.withLockedValue { state in + state.executableURL = newValue + } + } + } + + public var currentDirectoryURL: URL? { + get { + self.state.withLockedValue { state in + state.currentDirectoryURL + } + } + set { + self.state.withLockedValue { state in + state.currentDirectoryURL = newValue + } + } + } + + public var launchPath: String? { + get { + self.state.withLockedValue { state in + state.executableURL?.absoluteString + } + } + set { + self.state.withLockedValue { state in + state.executableURL = newValue.map { URL(fileURLWithPath: $0) } + } + } + } + + public var arguments: [String] { + get { + self.state.withLockedValue { state in + state.arguments + } + } + set { + self.state.withLockedValue { state in + state.arguments = newValue + } + } + } + + public var environment: [String: String] { + get { + self.state.withLockedValue { state in + state.environment + } + } + set { + self.state.withLockedValue { state in + state.environment = newValue + } + } + } + + public var standardOutput: FileHandle? { + get { + self.state.withLockedValue { state in + state.standardOutput + } + } + set { + self.state.withLockedValue { state in + state.standardOutput = newValue + } + } + } + + public var standardError: FileHandle? { + get { + self.state.withLockedValue { state in + state.standardError + } + } + set { + self.state.withLockedValue { state in + state.standardError = newValue + } + } + } + + public var standardInput: Pipe? { + get { + self.state.withLockedValue { state in + state.standardInput + } + } + set { + self.state.withLockedValue { state in + state.standardInput = newValue + } + } + } + + public var terminationHandler: (@Sendable (PSProcess) -> Void)? { + get { + self.state.withLockedValue { state in + state.terminationHandler + } + } + set { + self.state.withLockedValue { state in + state.terminationHandler = newValue + } + } + } + + public var _closeOtherFileDescriptors: Bool { + get { + self.state.withLockedValue { state in + return state.closeOtherFileDescriptors + } + } + set { + self.state.withLockedValue { state in + state.closeOtherFileDescriptors = newValue + } + } + } + + public var _createNewSession: Bool { + get { + self.state.withLockedValue { state in + return state.createNewSession + } + } + set { + self.state.withLockedValue { state in + state.createNewSession = newValue + } + } + } +} + +func copyOwnedCTypedString(_ string: String) -> UnsafeMutablePointer { + let out = UnsafeMutableBufferPointer.allocate(capacity: string.utf8.count + 1) + _ = out.initialize(from: string.utf8.map { CChar(bitPattern: $0) }) + out[out.endIndex - 1] = 0 + + return out.baseAddress! +} + +func copyOwnedCTypedStringArray(_ array: [String]) -> UnsafeMutablePointer?> { + let out = UnsafeMutableBufferPointer?>.allocate(capacity: array.count + 1) + for (index, string) in array.enumerated() { + out[index] = copyOwnedCTypedString(string) + } + out[out.endIndex - 1] = nil + + return out.baseAddress! +} diff --git a/Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift b/Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift index d2d693e..5a760f2 100644 --- a/Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift +++ b/Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -43,9 +43,7 @@ final class AsyncByteBufferLineSequenceTests: XCTestCase { func testManyChunksNoNewlineNotDeliveringLastChunk() async throws { for n in 0..<100 { let inputs: [ByteBuffer] = [ByteBuffer(repeating: 0, count: n)] - let lines = try await Array( - inputs.async.splitIntoLines(dropLastChunkIfNoNewline: true).strings - ) + let lines = try await Array(inputs.async.splitIntoLines(dropLastChunkIfNoNewline: true).strings) XCTAssertEqual([], lines) } } diff --git a/Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift b/Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift index a9dfb16..56e7ab4 100644 --- a/Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift +++ b/Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -13,8 +13,8 @@ import Logging import NIOConcurrencyHelpers -final class LogRecorderHandler: LogHandler { - let state = NIOLockedValueBox(State()) +final internal class LogRecorderHandler: LogHandler { + internal let state = NIOLockedValueBox(State()) struct FullLogMessage: Equatable { var level: Logger.Level @@ -29,7 +29,7 @@ final class LogRecorderHandler: LogHandler { } func makeLogger() -> Logger { - Logger(label: "LogRecorder for tests", factory: { _ in self }) + return Logger(label: "LogRecorder for tests", factory: { _ in self }) } func log( @@ -44,7 +44,7 @@ final class LogRecorderHandler: LogHandler { let fullMessage = FullLogMessage( level: level, message: message, - metadata: self.metadata.merging(metadata ?? [:]) { _, r in r } + metadata: self.metadata.merging(metadata ?? [:]) { l, r in r } ) self.state.withLockedValue { state in state.messages.append(fullMessage) @@ -52,12 +52,12 @@ final class LogRecorderHandler: LogHandler { } var recordedMessages: [FullLogMessage] { - self.state.withLockedValue { $0.messages } + return self.state.withLockedValue { $0.messages } } subscript(metadataKey key: String) -> Logging.Logger.Metadata.Value? { get { - self.state.withLockedValue { + return self.state.withLockedValue { $0.metadata[key] } } @@ -70,13 +70,13 @@ final class LogRecorderHandler: LogHandler { var metadata: Logging.Logger.Metadata { get { - self.state.withLockedValue { + return self.state.withLockedValue { $0.metadata } } set { - self.state.withLockedValue { + return self.state.withLockedValue { $0.metadata = newValue } } @@ -84,13 +84,13 @@ final class LogRecorderHandler: LogHandler { var logLevel: Logging.Logger.Level { get { - self.state.withLockedValue { + return self.state.withLockedValue { $0.logLevel } } set { - self.state.withLockedValue { + return self.state.withLockedValue { $0.logLevel = newValue } } diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift index 922e37a..f79cd50 100644 --- a/Tests/AsyncProcessTests/IntegrationTests.swift +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift open source project // -// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -20,13 +20,24 @@ import XCTest #if canImport(Darwin) import Darwin +#elseif canImport(Musl) + @preconcurrency import Musl +#elseif canImport(Glibc) + @preconcurrency import Glibc +#elseif canImport(WASILibc) + @preconcurrency import WASILibc +#elseif canImport(Bionic) + @preconcurrency import Bionic +#elseif canImport(Android) + @preconcurrency import Android #else - import Glibc + #error("unknown libc, please fix") #endif final class IntegrationTests: XCTestCase { private var group: EventLoopGroup! private var logger: Logger! + private var highestFD: CInt? func testTheBasicsWork() async throws { let exe = ProcessExecutor( @@ -39,7 +50,7 @@ final class IntegrationTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { for try await chunk in await merge(exe.standardOutput, exe.standardError) { - XCTFail("unexpected output: \(chunk.debugDescription)") + XCTFail("unexpected output: \(String(buffer: chunk)): \(chunk)") } } let result = try await exe.run() @@ -48,7 +59,7 @@ final class IntegrationTests: XCTestCase { } func testExitCodesWork() async throws { - for exitCode in UInt8.min...UInt8.max { + for exitCode in (UInt8.min...UInt8.max) { let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", @@ -59,23 +70,47 @@ final class IntegrationTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { for try await chunk in await merge(exe.standardOutput, exe.standardError) { - XCTFail("unexpected output: \(chunk.debugDescription)") + XCTFail("unexpected output: \(String(buffer: chunk)): \(chunk)") } } let result = try await exe.run() XCTAssertEqual(.exit(CInt(exitCode)), result) + XCTAssertEqual(Int(exitCode), result.asShellExitCode) + XCTAssertEqual(Int(exitCode), result.asPythonExitCode) } } } + #if ASYNC_PROCESS_ENABLE_TESTS_WITH_PLATFORM_ASSUMPTIONS + // The test below won't work on many shells ("/bin/sh: 1: exit: Illegal number: -999999999") + func testWeirdExitCodesWork() async throws { + for (exitCode, expected) in [(-1, 255), (-2, 254), (256, 0), (99_999_999, 255), (-999_999_999, 1)] { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "exit \(exitCode)"], + standardInput: EOFSequence(), + logger: self.logger + ) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for try await chunk in await merge(exe.standardOutput, exe.standardError) { + XCTFail("unexpected output: \(String(buffer: chunk)): \(chunk)") + } + } + + let result = try await exe.run() + XCTAssertEqual(.exit(CInt(expected)), result) + XCTAssertEqual(Int(expected), result.asShellExitCode) + XCTAssertEqual(Int(expected), result.asPythonExitCode) + } + } + } + #endif + func testSignalsWork() async throws { - #if os(Linux) - // workaround for https://github.com/apple/swift-corelibs-foundation/issues/4772 - let signalsToTest: [CInt] = [SIGKILL] - #else - let signalsToTest: [CInt] = [SIGKILL, SIGTERM, SIGINT] - #endif + let signalsToTest: [CInt] = [SIGKILL, SIGTERM, SIGINT] for signal in signalsToTest { let exe = ProcessExecutor( group: self.group, @@ -88,12 +123,14 @@ final class IntegrationTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { for try await chunk in await merge(exe.standardOutput, exe.standardError) { - XCTFail("unexpected output: \(chunk.debugDescription)") + XCTFail("unexpected output: \(String(buffer: chunk)): \(chunk)") } } let result = try await exe.run() XCTAssertEqual(.signal(CInt(signal)), result) + XCTAssertEqual(128 + Int(signal), result.asShellExitCode) + XCTAssertEqual(-Int(signal), result.asPythonExitCode) } } } @@ -126,13 +163,13 @@ final class IntegrationTests: XCTestCase { group.addTask { for try await chunk in await exe.standardError { - XCTFail("unexpected stderr output: \(chunk.debugDescription)") + XCTFail("unexpected output: \(String(buffer: chunk)): \(chunk)") } return nil } group.addTask { - try await exe.run() + return try await exe.run() } input.producer.yield(ByteBuffer(string: "GO\n")) @@ -147,7 +184,7 @@ final class IntegrationTests: XCTestCase { while let furtherReturn = try await group.next() { totalTasksReturned += 1 switch furtherReturn { - case let .some(result): + case .some(let result): // the `exe.run()` task XCTAssert(.signal(SIGKILL) == result || .exit(0) == result) case .none: @@ -222,9 +259,9 @@ final class IntegrationTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { var allOutput: [String] = [] - for try await (stream, line) in await merge( - exe.standardOutput.splitIntoLines(dropTerminator: true).map { ("stdout", $0) }, - exe.standardError.splitIntoLines(dropTerminator: true).map { ("stderr", $0) } + for try await (stream, line) in merge( + await exe.standardOutput.splitIntoLines(dropTerminator: true).map { ("stdout", $0) }, + await exe.standardError.splitIntoLines(dropTerminator: true).map { ("stderr", $0) } ) { let formattedOutput = "\(String(buffer: line)) [\(stream)]" allOutput.append(formattedOutput) @@ -232,8 +269,8 @@ final class IntegrationTests: XCTestCase { XCTAssertEqual( [ - "hello stderr [stderr]", - "hello stdout [stdout]", + ("hello stderr [stderr]"), + ("hello stdout [stdout]"), ], allOutput.sorted() ) @@ -317,6 +354,8 @@ final class IntegrationTests: XCTestCase { } func testOutputWithoutNewlinesThatIsSplitIntoLines() async throws { + self.logger = Logger(label: "x") + self.logger.logLevel = .trace let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", @@ -347,10 +386,10 @@ final class IntegrationTests: XCTestCase { } let everything = try await Array(group).sorted { l, r in - guard let l else { + guard let l = l else { return true } - guard let r else { + guard let r = r else { return false } return l.0 < r.0 @@ -371,7 +410,7 @@ final class IntegrationTests: XCTestCase { let exe = ProcessExecutor( group: self.group, executable: "/bin/dd", - ["if=/dev/zero", "bs=\(1024 * 1024)", "count=1024", "status=none"], + ["if=/dev/zero", "bs=\(1024*1024)", "count=1024", "status=none"], standardInput: EOFSequence(), standardOutput: .discard, standardError: .stream, @@ -387,7 +426,7 @@ final class IntegrationTests: XCTestCase { let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", - ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=1024 status=none; echo OK"], + ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024*1024) count=1024 status=none; echo OK"], standardInput: EOFSequence(), standardOutput: .stream, standardError: .discard, @@ -409,13 +448,13 @@ final class IntegrationTests: XCTestCase { let file = tempDir.appendingPathComponent("file") - let exe = try ProcessExecutor( + let exe = ProcessExecutor( group: self.group, executable: "/bin/dd", - ["if=/dev/zero", "bs=\(1024 * 1024)", "count=3", "status=none"], + ["if=/dev/zero", "bs=\(1024*1024)", "count=3", "status=none"], standardInput: EOFSequence(), standardOutput: .fileDescriptor( - takingOwnershipOf: .open( + takingOwnershipOf: try .open( .init(file.path.removingPercentEncoding!), .writeOnly, options: .create, @@ -445,7 +484,7 @@ final class IntegrationTests: XCTestCase { let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", - ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=3 status=none; echo OK"], + ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024*1024) count=3 status=none; echo OK"], standardInput: EOFSequence(), standardOutput: .stream, standardError: .fileDescriptor( @@ -512,7 +551,7 @@ final class IntegrationTests: XCTestCase { executable: "/bin/sh", [ "-c", - "/bin/dd if=/dev/zero bs=\(1024 * 1024) count=1; /bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=1;", + "/bin/dd if=/dev/zero bs=\(1024*1024) count=1; /bin/dd >&2 if=/dev/zero bs=\(1024*1024) count=1;", ], standardInput: EOFSequence(), standardOutput: .discard, @@ -525,7 +564,7 @@ final class IntegrationTests: XCTestCase { func testLogOutputToMetadata() async throws { let sharedRecorder = LogRecorderHandler() - var recordedLogger = Logger(label: "recorder", factory: { _ in sharedRecorder }) + var recordedLogger = Logger(label: "recorder", factory: { label in sharedRecorder }) recordedLogger.logLevel = .info // don't give us the normal messages recordedLogger[metadataKey: "yo"] = "hey" @@ -535,23 +574,19 @@ final class IntegrationTests: XCTestCase { ["-c", "echo 1; echo >&2 2; echo 3; echo >&2 4; echo 5; echo >&2 6; echo 7; echo >&2 8;"], standardInput: EOFSequence(), logger: recordedLogger, - logConfiguration: OutputLoggingSettings( - logLevel: .critical, - to: .metadata(logMessage: "msg", key: "key") - ) + logConfiguration: OutputLoggingSettings(logLevel: .critical, to: .metadata(logMessage: "msg", key: "key")) ).throwIfNonZero() XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.level == .critical }) XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.message == "msg" }) XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["key"] != nil }) XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["yo"] == "hey" }) - let loggedLines = sharedRecorder.recordedMessages.compactMap { $0.metadata["key"]?.description } - .sorted() + let loggedLines = sharedRecorder.recordedMessages.compactMap { $0.metadata["key"]?.description }.sorted() XCTAssertEqual(["1", "2", "3", "4", "5", "6", "7", "8"], loggedLines) } func testLogOutputToMessage() async throws { let sharedRecorder = LogRecorderHandler() - var recordedLogger = Logger(label: "recorder", factory: { _ in sharedRecorder }) + var recordedLogger = Logger(label: "recorder", factory: { label in sharedRecorder }) recordedLogger.logLevel = .info // don't give us the normal messages recordedLogger[metadataKey: "yo"] = "hey" @@ -566,7 +601,7 @@ final class IntegrationTests: XCTestCase { XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.level == .critical }) XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["key"] == nil }) XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["yo"] == "hey" }) - let loggedLines = sharedRecorder.recordedMessages.map(\.message.description).sorted() + let loggedLines = sharedRecorder.recordedMessages.map { $0.message.description }.sorted() XCTAssertEqual(["1", "2", "3", "4", "5", "6", "7", "8"], loggedLines) } @@ -593,7 +628,7 @@ final class IntegrationTests: XCTestCase { ).throwIfNonZero() XCTAssertEqual( ["1", "2", "3", "4", "5", "6", "7", "8"], - collectedLines.withLockedValue { $0.map(\.1) }.sorted() + collectedLines.withLockedValue { $0.map { $0.1 } }.sorted() ) } @@ -602,11 +637,11 @@ final class IntegrationTests: XCTestCase { try await ProcessExecutor.runProcessingOutput( group: self.group, executable: "/bin/dd", - ["if=/dev/zero", "bs=\(1024 * 1024)", "count=20", "status=none"], + ["if=/dev/zero", "bs=\(1024*1024)", "count=20", "status=none"], standardInput: EOFSequence(), outputProcessor: { stream, chunk in XCTAssertEqual(stream, .standardOutput) - XCTAssert(chunk.withUnsafeReadableBytes { $0.allSatisfy { $0 == 0 } }) + XCTAssert(chunk.withUnsafeReadableBytes { $0.allSatisfy({ $0 == 0 }) }) collectedBytes.wrappingIncrement(by: chunk.readableBytes, ordering: .relaxed) }, splitOutputIntoLines: true, @@ -769,11 +804,8 @@ final class IntegrationTests: XCTestCase { let result = try await exe.run() XCTFail("got result for bad executable: \(result)") } catch { - XCTAssertEqual(NSCocoaErrorDomain, (error as NSError).domain) - #if canImport(Darwin) - // https://github.com/apple/swift-corelibs-foundation/issues/4810 - XCTAssertEqual(NSFileNoSuchFileError, (error as NSError).code) - #endif + XCTAssertEqual(NSCocoaErrorDomain, (error as NSError).domain, "\(error)") + XCTAssertEqual(NSFileNoSuchFileError, (error as NSError).code, "\(error)") } } @@ -806,6 +838,7 @@ final class IntegrationTests: XCTestCase { ["-c", "true"], standardInput: EOFSequence() ) { _, _ in + return }.throwIfNonZero() try await ProcessExecutor.runLogOutput( @@ -862,6 +895,7 @@ final class IntegrationTests: XCTestCase { ).exitReason.throwIfNonZero() try await ProcessExecutor.runProcessingOutput(executable: "/bin/sh", ["-c", "true"]) { _, _ in + return }.throwIfNonZero() try await ProcessExecutor.runLogOutput( @@ -880,9 +914,7 @@ final class IntegrationTests: XCTestCase { XCTAssertNoThrow(try FileManager.default.removeItem(at: tempDir)) } - for (stdoutMode, stderrMode) in [ - ("shared", "shared"), ("shared", "owned"), ("owned", "shared"), - ] { + for (stdoutMode, stderrMode) in [("shared", "shared"), ("shared", "owned"), ("owned", "shared")] { let filePath = tempDir.appendingPathComponent("file-\(stdoutMode)-\(stderrMode)") let fd = try FileDescriptor.open( .init(filePath.path.removingPercentEncoding!), @@ -910,17 +942,8 @@ final class IntegrationTests: XCTestCase { stderr = .fileDescriptor(sharing: fd) } - #if canImport(Darwin) - let command = - "for o in 1 2; do i=1000; while [ $i -gt 0 ]; do echo $o >&$o; i=$(( $i - 1 )); done & done; wait" - #else - // workaround for - // https://github.com/apple/swift-corelibs-foundation/issues/4772 - // which causes `SIGCHLD` being blocked in the shell so it can't wait for its children :| - let command = - "for o in 1 2; do i=1000; while [ $i -gt 0 ]; do echo $o >&$o; i=$(( $i - 1 )); done & done; sleep 10" - #endif - + let command = + "for o in 1 2; do i=1000; while [ $i -gt 0 ]; do echo $o >&$o; i=$(( $i - 1 )); done & done; wait" let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", @@ -948,7 +971,7 @@ final class IntegrationTests: XCTestCase { returning: ProcessExitReason.self ) { group in group.addTask { - try await ProcessExecutor.run( + return try await ProcessExecutor.run( executable: "/bin/sh", ["-c", "trap 'echo no' TERM; while true; do sleep 1; done"] ) @@ -960,7 +983,7 @@ final class IntegrationTests: XCTestCase { while let result = try await group.next() { group.cancelAll() - if let result { + if let result = result { return result } } @@ -970,14 +993,14 @@ final class IntegrationTests: XCTestCase { } func testCancelProcessVeryEarlyOnStressTest() async throws { - for i in 0..<1000 { + for i in 0..<100 { self.logger.debug("iteration go", metadata: ["iteration-number": "\(i)"]) let exitReason = try await withThrowingTaskGroup( of: ProcessExitReason?.self, returning: ProcessExitReason.self ) { group in group.addTask { [logger = self.logger!] in - try await ProcessExecutor.run( + return try await ProcessExecutor.run( executable: "/bin/sleep", ["100000"], logger: logger @@ -992,7 +1015,7 @@ final class IntegrationTests: XCTestCase { while let result = try await group.next() { group.cancelAll() - if let result { + if let result = result { return result } } @@ -1037,12 +1060,10 @@ final class IntegrationTests: XCTestCase { try await p.sendSignal(SIGKILL) let finalResult = try await result XCTAssertEqual(.signal(SIGKILL), finalResult) + while try await outputIterator.next() != nil {} } - #if os(macOS) - // This test will deadlock on anything that uses swift-corelibs-foundation because of - // https://github.com/apple/swift-corelibs-foundation/issues/4795 - // Foundation.Process on Linux doesn't correctly detect when child process dies (creating zombie processes) + #if ASYNC_PROCESS_ENABLE_TESTS_WITH_PLATFORM_ASSUMPTIONS func testCanDealWithRunawayChildProcesses() async throws { self.logger = Logger(label: "x") self.logger.logLevel = .info @@ -1052,23 +1073,24 @@ final class IntegrationTests: XCTestCase { "-c", """ set -e - /usr/bin/yes "Runaway process from \(#function), please file a swift-sdk-generator bug." > /dev/null & + /usr/bin/yes "Runaway process from \(#function), please file a swift-async-process bug." > /dev/null & child_pid=$! - trap "echo >&2 killing $child_pid; kill -KILL $child_pid" INT + trap "echo >&2 'child: received signal, killing grand child ($child_pid)'; kill $child_pid" INT + echo "$$" # communicate our pid to our parent echo "$child_pid" # communicate the child pid to our parent exec >&- # close stdout - echo "waiting for $child_pid" >&2 + echo "child: waiting for grand child, pid: $child_pid" >&2 wait """, ], - standardError: .discard, + standardError: .inherit, teardownSequence: [ .sendSignal(SIGINT, allowedTimeToExitNS: 10_000_000_000) ], logger: self.logger ) - try await withThrowingTaskGroup(of: pid_t?.self) { group in + try await withThrowingTaskGroup(of: (pid_t, pid_t)?.self) { group in group.addTask { let result = try await p.run() XCTAssertEqual(.exit(128 + SIGINT), result) @@ -1076,30 +1098,41 @@ final class IntegrationTests: XCTestCase { } group.addTask { - let pidString = try await String(buffer: p.standardOutput.pullAllOfIt()) - guard let pid = pid_t(pidString.dropLast()) else { - XCTFail("couldn't get pid from \(pidString)") + let pidStrings = String(buffer: try await p.standardOutput.pullAllOfIt()).split(separator: "\n") + guard let childPID = pid_t((pidStrings.dropFirst(0).first ?? "n/a")) else { + XCTFail("couldn't get child's pid from \(pidStrings)") + return nil + } + guard let grandChildPID = pid_t((pidStrings.dropFirst(1).first ?? "n/a")) else { + XCTFail("couldn't get grand child's pid from \(pidStrings)") return nil } - return pid + return (childPID, grandChildPID) } - let maybePid = try await group.next()! - let pid = try XCTUnwrap(maybePid) + let maybePids = try await group.next()! + let (childPID, grandChildPID) = try XCTUnwrap(maybePids) group.cancelAll() try await group.waitForAll() // Let's check that the subprocess (/usr/bin/yes) of our subprocess (/bin/bash) is actually dead // This is a tiny bit racy because the pid isn't immediately invalidated, so let's allow a few failures - for attempt in 0 ..< .max { - let killRet = kill(pid, 0) + for attempt in 1 ..< .max { + let killRet = kill(grandChildPID, 0) let errnoCode = errno - guard killRet == -1 || attempt > 5 else { - logger.error("kill didn't fail on attempt \(attempt), trying again...") - usleep(100_000) + if killRet == 0 && attempt < 10 { + logger.error("we expected kill to fail but it didn't. Attempt \(attempt), trying again...") + if attempt > 7 { + fputs("## lsof child:\n", stderr) + fputs(((try? await runLSOF(pid: childPID)) ?? "n/a") + "\n", stderr) + fputs("## lsof grand child:\n", stderr) + fputs(((try? await runLSOF(pid: grandChildPID)) ?? "n/a") + "\n", stderr) + fflush(stderr) + } + usleep(useconds_t(attempt) * 100_000) continue } - XCTAssertEqual(-1, killRet) + XCTAssertEqual(-1, killRet, "\(blockingLSOF(pid: grandChildPID))") XCTAssertEqual(ESRCH, errnoCode) break } @@ -1124,8 +1157,8 @@ final class IntegrationTests: XCTestCase { ], standardError: .discard, teardownSequence: [ - .sendSignal(SIGQUIT, allowedTimeToExitNS: 10_000_000), - .sendSignal(SIGTERM, allowedTimeToExitNS: 10_000_000), + .sendSignal(SIGQUIT, allowedTimeToExitNS: 200_000_000), + .sendSignal(SIGTERM, allowedTimeToExitNS: 200_000_000), .sendSignal(SIGINT, allowedTimeToExitNS: 1_000_000_000), ], logger: self.logger @@ -1134,10 +1167,7 @@ final class IntegrationTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { let result = try await p.run() - #if os(macOS) - // won't work on SCLF: https://github.com/apple/swift-corelibs-foundation/issues/4772 - XCTAssertEqual(.exit(3), result) - #endif + XCTAssertEqual(.exit(3), result) } var allLines: [String] = [] for try await line in await p.standardOutput.splitIntoLines().strings { @@ -1147,21 +1177,192 @@ final class IntegrationTests: XCTestCase { allLines.append(line) } try await group.waitForAll() - #if os(macOS) - // won't work on SCLF: https://github.com/apple/swift-corelibs-foundation/issues/4772 - XCTAssertEqual(["OK", "saw SIGQUIT", "saw SIGTERM", "saw SIGINT"], allLines) - #endif + XCTAssertEqual(["OK", "saw SIGQUIT", "saw SIGTERM", "saw SIGINT"], allLines) } } - // MARK: - Setup/teardown + func testCanInheritRandomFileDescriptors() async throws { + guard ProcessExecutor.isBackedByPSProcess else { + return // Foundation.Process does not support this + } + var spawnOptions = ProcessExecutor.SpawnOptions.default + spawnOptions.closeOtherFileDescriptors = false + var pipeFDs: [Int32] = [-1, -1] + pipeFDs.withUnsafeMutableBufferPointer { ptr in + XCTAssertEqual(0, pipe(ptr.baseAddress!), "pipe failed: \(errno))") + } + defer { + for fd in pipeFDs where fd >= 0 { + close(fd) + } + } + + let pipeWriteFD = pipeFDs[1] + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/bash", + ["-c", "echo hello from child >&\(pipeWriteFD); echo wrote into \(pipeWriteFD), echo exit code $?"], + spawnOptions: spawnOptions, + collectStandardOutput: true, + collectStandardError: true + ) + close(pipeFDs[1]) + pipeFDs[1] = -1 + var readBytes: [UInt8] = Array(repeating: 0, count: 1024) + let readBytesCount = try readBytes.withUnsafeMutableBytes { readBytesPtr in + try FileDescriptor(rawValue: pipeFDs[0]).read(into: readBytesPtr, retryOnInterrupt: true) + } + XCTAssertEqual(17, readBytesCount) + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertEqual("wrote into \(pipeWriteFD), echo exit code 0\n", String(buffer: result.standardOutput!)) + XCTAssertEqual("", String(buffer: result.standardError!)) + XCTAssertEqual( + "hello from child\n", + String(decoding: readBytes.prefix { $0 != 0 }, as: UTF8.self) + ) + } + + func testDoesNotInheritRandomFileDescriptorsByDefault() async throws { + let spawnOptions = ProcessExecutor.SpawnOptions.default + var pipeFDs: [Int32] = [-1, -1] + pipeFDs.withUnsafeMutableBufferPointer { ptr in + XCTAssertEqual(0, pipe(ptr.baseAddress!), "pipe failed: \(errno))") + } + defer { + for fd in pipeFDs where fd >= 0 { + close(fd) + } + } + + let pipeWriteFD = pipeFDs[1] + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/bash", + ["-c", "echo hello from child >&\(pipeWriteFD); echo wrote into \(pipeWriteFD), echo exit code $?"], + spawnOptions: spawnOptions, + collectStandardOutput: true, + collectStandardError: true + ) + close(pipeFDs[1]) + pipeFDs[1] = -1 + var readBytes: [UInt8] = Array(repeating: 0, count: 1024) + let readBytesCount = try readBytes.withUnsafeMutableBytes { readBytesPtr in + try FileDescriptor(rawValue: pipeFDs[0]).read(into: readBytesPtr, retryOnInterrupt: true) + } + XCTAssertEqual(0, readBytesCount) + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertEqual("wrote into \(pipeWriteFD), echo exit code 1\n", String(buffer: result.standardOutput!)) + XCTAssertNotEqual("", String(buffer: result.standardError!)) + XCTAssertEqual("", String(decoding: readBytes.prefix { $0 != 0 }, as: UTF8.self)) + } + func testCanChangeCWD() async throws { + var spawnOptions = ProcessExecutor.SpawnOptions.default + spawnOptions.changedWorkingDirectory = "/" + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/bash", + ["-c", "echo $PWD"], + spawnOptions: spawnOptions, + collectStandardOutput: true, + collectStandardError: true + ) + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertEqual("/\n", String(buffer: result.standardOutput!)) + XCTAssertEqual("", String(buffer: result.standardError!)) + } + + func testCanChangeCWDToNonExistent() async throws { + var spawnOptions = ProcessExecutor.SpawnOptions.default + spawnOptions.changedWorkingDirectory = "/dev/null/does/not/exist" + do { + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/bash", + ["-c", "pwd"], + spawnOptions: spawnOptions, + collectStandardOutput: true, + collectStandardError: true + ) + XCTFail("succeeded but shouldn't have: \(result)") + } catch { + XCTAssertEqual(NSCocoaErrorDomain, (error as NSError).domain, "\(error)") + XCTAssertEqual(NSFileNoSuchFileError, (error as NSError).code, "\(error)") + } + } + + func testCanReadThePid() async throws { + let (inputConsumer, inputProducer) = AsyncStream.justMakeIt(elementType: ByteBuffer.self) + let p = ProcessExecutor( + executable: "/bin/bash", + ["-c", #"read -r line && echo "$line" && echo ok $$"#], + standardInput: inputConsumer, + standardOutput: .stream, + standardError: .stream, + logger: self.logger + ) + async let resultAsync = p.run() + async let stdoutAsync = Array(p.standardOutput) + async let stderrAsync = Array(p.standardError) + var pid: pid_t? = nil + + // Wait until pid goes `nil` -> actual pid + while pid == nil { + pid = p.bestEffortProcessIdentifier + if pid != nil { continue } + self.logger.info("no pid yet, waiting", metadata: ["process": "\(p)"]) + try? await Task.sleep(nanoseconds: 100_000_000) + } + inputProducer.yield(ByteBuffer(string: "hello world\n")) + inputProducer.finish() + let result = try await resultAsync + let stdout = try await stdoutAsync + let stderr = try await stderrAsync + XCTAssertEqual(.exit(0), result) + XCTAssertEqual( + "hello world\nok \(pid ?? -1)\n", + String(buffer: stdout.reduce(into: ByteBuffer(), { acc, next in acc.writeImmutableBuffer(next) })) + ) + XCTAssertEqual( + "", + String(buffer: stderr.reduce(into: ByteBuffer(), { acc, next in acc.writeImmutableBuffer(next) })) + ) + + // Wait until pid goes actual pid -> `nil` + while pid != nil { + pid = p.bestEffortProcessIdentifier + if pid == nil { continue } + self.logger.info("pid still set, waiting", metadata: ["process": "\(p)"]) + try? await Task.sleep(nanoseconds: 100_000_000) + } + } + + // MARK: - Setup/teardown override func setUp() async throws { self.group = MultiThreadedEventLoopGroup(numberOfThreads: 3) self.logger = Logger(label: "test", factory: { _ in SwiftLogNoOpLogHandler() }) + + // Make sure the singleton threads have booted (because they use file descriptors) + try await MultiThreadedEventLoopGroup.singleton.next().submit {}.get() + self.highestFD = highestOpenFD() } override func tearDown() { + #if ASYNC_PROCESS_ENABLE_TESTS_WITH_PLATFORM_ASSUMPTIONS + var highestFD: CInt? = nil + for attempt in 0..<10 where highestFD != self.highestFD { + if highestFD != nil { + self.logger.debug( + "fd number differs", + metadata: [ + "before-test": "\(self.highestFD.debugDescription)", + "after-test": "\(highestFD.debugDescription)", + "attempt": "\(attempt)", + ] + ) + usleep(100_000) + } + highestFD = highestOpenFD() + } + XCTAssertEqual(self.highestFD, highestFD, "\(blockingLSOF(pid: getpid()))") + #endif + self.highestFD = nil self.logger = nil XCTAssertNoThrow(try self.group.syncShutdownGracefully()) @@ -1208,13 +1409,13 @@ extension ProcessExecutor { func runGetAllOutput() async throws -> AllOfAProcess { try await withThrowingTaskGroup(of: What.self, returning: AllOfAProcess.self) { group in group.addTask { - try await .exit(self.run()) + return .exit(try await self.run()) } group.addTask { - try await .stdout(self.standardOutput.pullAllOfIt()) + return .stdout(try await self.standardOutput.pullAllOfIt()) } group.addTask { - try await .stderr(self.standardError.pullAllOfIt()) + return .stderr(try await self.standardError.pullAllOfIt()) } var exitReason: ProcessExitReason? @@ -1223,11 +1424,11 @@ extension ProcessExecutor { while let next = try await group.next() { switch next { - case let .exit(value): + case .exit(let value): exitReason = value - case let .stderr(value): + case .stderr(let value): stderr = value - case let .stdout(value): + case .stdout(let value): stdout = value } } @@ -1236,3 +1437,76 @@ extension ProcessExecutor { } } } + +private func highestOpenFD() -> CInt? { + #if os(macOS) + guard let dirPtr = opendir("/dev/fd") else { + return nil + } + #elseif os(Linux) + guard let dirPtr = opendir("/proc/self/fd") else { + return nil + } + #else + return nil + #endif + defer { + closedir(dirPtr) + } + var highestFDSoFar = CInt(0) + + while let dirEntPtr = readdir(dirPtr) { + var entryName = dirEntPtr.pointee.d_name + let thisFD = withUnsafeBytes(of: &entryName) { entryNamePtr -> CInt? in + + CInt(String(decoding: entryNamePtr.prefix(while: { $0 != 0 }), as: Unicode.UTF8.self)) + } + highestFDSoFar = max(thisFD ?? -1, highestFDSoFar) + } + + return highestFDSoFar +} + +private func runLSOF(pid: pid_t) async throws -> String { + #if canImport(Darwin) + let lsofPath = "/usr/sbin/lsof" + #else + let lsofPath = "/usr/bin/lsof" + #endif + let result = try await ProcessExecutor.runCollectingOutput( + executable: lsofPath, + ["-Pnp", "\(pid)"], + collectStandardOutput: true, + collectStandardError: true + ) + let outString = """ + exit code: \(result.exitReason)\n + ## stdout + \(String(buffer: result.standardOutput!)) + + ## stderr + \(String(buffer: result.standardError!)) + + """ + return outString +} + +private func blockingLSOF(pid: pid_t) -> String { + let box = NIOLockedValueBox("n/a") + let sem = DispatchSemaphore(value: 0) + Task { + defer { + sem.signal() + } + do { + let outString = try await runLSOF(pid: pid) + box.withLockedValue { $0 = outString } + } catch { + box.withLockedValue { debugString in + debugString = "ERROR: \(error)" + } + } + } + _ = sem.wait(timeout: .now() + 10) + return box.withLockedValue { $0 } +}