diff --git a/Sources/SWBTestSupport/BuildOperationTester.swift b/Sources/SWBTestSupport/BuildOperationTester.swift index df0f3878..a5d1ec09 100644 --- a/Sources/SWBTestSupport/BuildOperationTester.swift +++ b/Sources/SWBTestSupport/BuildOperationTester.swift @@ -1476,7 +1476,7 @@ package final class BuildOperationTester { /// Construct the tasks for the given build parameters, and test the result. @discardableResult package func checkBuild(_ name: String? = nil, parameters: BuildParameters? = nil, runDestination: SWBProtocol.RunDestinationInfo?, buildRequest inputBuildRequest: BuildRequest? = nil, buildCommand: BuildCommand? = nil, schemeCommand: SchemeCommand? = .launch, persistent: Bool = false, serial: Bool = false, buildOutputMap: [String:String]? = nil, signableTargets: Set = [], signableTargetInputs: [String: ProvisioningTaskInputs] = [:], clientDelegate: (any ClientDelegate)? = nil, sourceLocation: SourceLocation = #_sourceLocation, body: (BuildResults) async throws -> T) async throws -> T { - try await checkBuild(name, parameters: parameters, runDestination: runDestination, buildRequest: inputBuildRequest, buildCommand: buildCommand, schemeCommand: schemeCommand, persistent: persistent, serial: serial, buildOutputMap: buildOutputMap, signableTargets: signableTargets, signableTargetInputs: signableTargetInputs, clientDelegate: clientDelegate, sourceLocation: sourceLocation, body: body, performBuild: { await $0.buildWithTimeout() }) + try await checkBuild(name, parameters: parameters, runDestination: runDestination, buildRequest: inputBuildRequest, buildCommand: buildCommand, schemeCommand: schemeCommand, persistent: persistent, serial: serial, buildOutputMap: buildOutputMap, signableTargets: signableTargets, signableTargetInputs: signableTargetInputs, clientDelegate: clientDelegate, sourceLocation: sourceLocation, body: body, performBuild: { try await $0.buildWithTimeout() }) } /// Construct the tasks for the given build parameters, and test the result. @@ -1670,7 +1670,7 @@ package final class BuildOperationTester { let operationParameters = buildRequest.parameters.replacing(activeRunDestination: runDestination, activeArchitecture: nil) let operationBuildRequest = buildRequest.with(parameters: operationParameters, buildTargets: []) - return try await checkBuild(runDestination: nil, buildRequest: buildRequest, operationBuildRequest: operationBuildRequest, persistent: persistent, sourceLocation: sourceLocation, body: body, performBuild: { await $0.buildWithTimeout() }) + return try await checkBuild(runDestination: nil, buildRequest: buildRequest, operationBuildRequest: operationBuildRequest, persistent: persistent, sourceLocation: sourceLocation, body: body, performBuild: { try await $0.buildWithTimeout() }) } package struct BuildGraphResult: Sendable { @@ -2306,8 +2306,8 @@ private let buildSystemOperationQueue = AsyncOperationQueue(concurrentTasks: 6) extension BuildSystemOperation { /// Runs the build system operation -- responds to cooperative cancellation and limited to 6 concurrent operations per process. - func buildWithTimeout() async { - await buildSystemOperationQueue.withOperation { + func buildWithTimeout() async throws { + try await buildSystemOperationQueue.withOperation { do { try await withTimeout(timeout: .seconds(1200), description: "Build system operation 20-minute limit") { await withTaskCancellationHandler { diff --git a/Sources/SWBUtil/AsyncOperationQueue.swift b/Sources/SWBUtil/AsyncOperationQueue.swift index 1146d083..1ef3b936 100644 --- a/Sources/SWBUtil/AsyncOperationQueue.swift +++ b/Sources/SWBUtil/AsyncOperationQueue.swift @@ -10,52 +10,204 @@ // //===----------------------------------------------------------------------===// -public actor AsyncOperationQueue { +import Foundation + +/// A queue for running async operations with a limit on the number of concurrent tasks. +public final class AsyncOperationQueue: @unchecked Sendable { + + // This implementation is identical to the AsyncOperationQueue in swift-package-manager. + // Any modifications made here should also be made there. + // https://github.com/swiftlang/swift-build/blob/main/Sources/SWBUtil/AsyncOperationQueue.swift#L13 + + fileprivate typealias ID = UUID + fileprivate typealias WaitingContinuation = CheckedContinuation + private let concurrentTasks: Int - private var activeTasks: Int = 0 - private var waitingTasks: [CheckedContinuation] = [] + private var waitingTasks: [WorkTask] = [] + private let waitingTasksLock = NSLock() + + fileprivate enum WorkTask { + case creating(ID) + case waiting(ID, WaitingContinuation) + case running(ID) + case cancelled(ID) + + var id: ID { + switch self { + case .creating(let id), .waiting(let id, _), .running(let id), .cancelled(let id): + return id + } + } + + var continuation: WaitingContinuation? { + guard case .waiting(_, let continuation) = self else { + return nil + } + return continuation + } + } + /// Creates an `AsyncOperationQueue` with a specified number of concurrent tasks. + /// - Parameter concurrentTasks: The maximum number of concurrent tasks that can be executed concurrently. public init(concurrentTasks: Int) { self.concurrentTasks = concurrentTasks } deinit { - if !waitingTasks.isEmpty { - preconditionFailure("Deallocated with waiting tasks") + waitingTasksLock.withLock { + if !waitingTasks.isEmpty { + preconditionFailure("Deallocated with waiting tasks") + } } } + /// Executes an asynchronous operation, ensuring that the number of concurrent tasks + // does not exceed the specified limit. + /// - Parameter operation: The asynchronous operation to execute. + /// - Returns: The result of the operation. + /// - Throws: An error thrown by the operation, or a `CancellationError` if the operation is cancelled. public func withOperation( - _ operation: @Sendable () async -> sending ReturnValue - ) async -> ReturnValue { - await waitIfNeeded() - defer { signalCompletion() } - return await operation() - } - - public func withOperation( - _ operation: @Sendable () async throws -> sending ReturnValue + _ operation: () async throws -> sending ReturnValue ) async throws -> ReturnValue { - await waitIfNeeded() - defer { signalCompletion() } + let taskId = try await waitIfNeeded() + defer { signalCompletion(taskId) } return try await operation() } - private func waitIfNeeded() async { - if activeTasks >= concurrentTasks { - await withCheckedContinuation { continuation in - waitingTasks.append(continuation) - } + private func waitIfNeeded() async throws -> ID { + let workTask = waitingTasksLock.withLock({ + let shouldWait = waitingTasks.count >= concurrentTasks + let workTask = shouldWait ? WorkTask.creating(ID()) : .running(ID()) + waitingTasks.append(workTask) + return workTask + }) + + // If we aren't creating a task that needs to wait, we're under the concurrency limit. + guard case .creating(let taskId) = workTask else { + return workTask.id } - activeTasks += 1 + enum TaskAction { + case start(WaitingContinuation) + case cancel(WaitingContinuation) + } + + try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { (continuation: WaitingContinuation) -> Void in + let action: TaskAction? = waitingTasksLock.withLock { + guard let index = waitingTasks.firstIndex(where: { $0.id == taskId }) else { + // The task may have been marked as cancelled already and then removed from + // waitingTasks in `signalCompletion`. + return .cancel(continuation) + } + + switch waitingTasks[index] { + case .cancelled: + // If the task was cancelled in between creating the task cancellation handler and acquiring the lock, + // we should resume the continuation with a `CancellationError`. + waitingTasks.remove(at: index) + return .cancel(continuation) + case .creating, .running, .waiting: + // A task may have completed since we initially checked if we should wait. Check again in this locked + // section and if we can start it, remove it from the waiting tasks and start it immediately. + if waitingTasks.count >= concurrentTasks { + waitingTasks[index] = .waiting(taskId, continuation) + return nil + } else { + waitingTasks.remove(at: index) + return .start(continuation) + } + } + } + + switch action { + case .some(.cancel(let continuation)): + continuation.resume(throwing: _Concurrency.CancellationError()) + case .some(.start(let continuation)): + continuation.resume() + case .none: + return + } + } + } onCancel: { + let continuation: WaitingContinuation? = self.waitingTasksLock.withLock { + guard let taskIndex = self.waitingTasks.firstIndex(where: { $0.id == taskId }) else { + return nil + } + + switch self.waitingTasks[taskIndex] { + case .waiting(_, let continuation): + self.waitingTasks.remove(at: taskIndex) + + // If the parent task is cancelled then we need to manually handle resuming the + // continuation for the waiting task with a `CancellationError`. Return the continuation + // here so it can be resumed once the `waitingTasksLock` is released. + return continuation + case .creating, .running: + // If the task was still being created, mark it as cancelled in `waitingTasks` so that + // the handler for `withCheckedThrowingContinuation` can immediately cancel it. + self.waitingTasks[taskIndex] = .cancelled(taskId) + return nil + case .cancelled: + preconditionFailure("Attempting to cancel a task that was already cancelled") + } + } + + continuation?.resume(throwing: _Concurrency.CancellationError()) + } + return workTask.id } - private func signalCompletion() { - activeTasks -= 1 + private func signalCompletion(_ taskId: ID) { + let continuationToResume = waitingTasksLock.withLock { () -> WaitingContinuation? in + guard !waitingTasks.isEmpty else { + return nil + } - if let continuation = waitingTasks.popLast() { - continuation.resume() + // Remove the completed task from the list to decrement the active task count. + if let taskIndex = self.waitingTasks.firstIndex(where: { $0.id == taskId }) { + waitingTasks.remove(at: taskIndex) + } + + // We cannot remove elements from `waitingTasks` while iterating over it, so we make + // a pass to collect operations and then apply them after the loop. + func createTaskListOperations() -> (CollectionDifference?, WaitingContinuation?) { + var changes: [CollectionDifference.Change] = [] + for (index, task) in waitingTasks.enumerated() { + switch task { + case .running: + // Skip tasks that are already running, looking for the first one that is waiting or creating. + continue + case .creating: + // If the next task is in the process of being created, let the + // creation code in the `withCheckedThrowingContinuation` in `waitIfNeeded` + // handle starting the task. + break + case .waiting: + // Begin the next waiting task + changes.append(.remove(offset: index, element: task, associatedWith: nil)) + return (CollectionDifference(changes), task.continuation) + case .cancelled: + // If the next task is cancelled, continue removing cancelled + // tasks until we find one that hasn't run yet, or we exaust the list of waiting tasks. + changes.append(.remove(offset: index, element: task, associatedWith: nil)) + continue + } + } + return (CollectionDifference(changes), nil) + } + + let (collectionOperations, continuation) = createTaskListOperations() + if let operations = collectionOperations { + guard let appliedDiff = waitingTasks.applying(operations) else { + preconditionFailure("Failed to apply changes to waiting tasks") + } + waitingTasks = appliedDiff + } + + return continuation } + + continuationToResume?.resume() } }