Skip to content

Commit 50e386e

Browse files
authored
Merge pull request swiftlang#569 from plemarquand/async-operation-queue
2 parents caf6b9f + 76735f3 commit 50e386e

File tree

2 files changed

+182
-30
lines changed

2 files changed

+182
-30
lines changed

Sources/SWBTestSupport/BuildOperationTester.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,7 +1415,7 @@ package final class BuildOperationTester {
14151415

14161416
/// Construct the tasks for the given build parameters, and test the result.
14171417
@discardableResult package func checkBuild<T>(_ name: String? = nil, parameters: BuildParameters? = nil, runDestination: SWBProtocol.RunDestinationInfo?, buildRequest inputBuildRequest: BuildRequest? = nil, buildCommand: BuildCommand? = nil, schemeCommand: SchemeCommand? = .launch, persistent: Bool = false, serial: Bool = false, buildOutputMap: [String:String]? = nil, signableTargets: Set<String> = [], signableTargetInputs: [String: ProvisioningTaskInputs] = [:], clientDelegate: (any ClientDelegate)? = nil, sourceLocation: SourceLocation = #_sourceLocation, body: (BuildResults) async throws -> T) async throws -> T {
1418-
try await checkBuild(name, parameters: parameters, runDestination: runDestination, buildRequest: inputBuildRequest, buildCommand: buildCommand, schemeCommand: schemeCommand, persistent: persistent, serial: serial, buildOutputMap: buildOutputMap, signableTargets: signableTargets, signableTargetInputs: signableTargetInputs, clientDelegate: clientDelegate, sourceLocation: sourceLocation, body: body, performBuild: { await $0.buildWithTimeout() })
1418+
try await checkBuild(name, parameters: parameters, runDestination: runDestination, buildRequest: inputBuildRequest, buildCommand: buildCommand, schemeCommand: schemeCommand, persistent: persistent, serial: serial, buildOutputMap: buildOutputMap, signableTargets: signableTargets, signableTargetInputs: signableTargetInputs, clientDelegate: clientDelegate, sourceLocation: sourceLocation, body: body, performBuild: { try await $0.buildWithTimeout() })
14191419
}
14201420

14211421
/// Construct the tasks for the given build parameters, and test the result.
@@ -1573,7 +1573,7 @@ package final class BuildOperationTester {
15731573
let operationParameters = buildRequest.parameters.replacing(activeRunDestination: runDestination, activeArchitecture: nil)
15741574
let operationBuildRequest = buildRequest.with(parameters: operationParameters, buildTargets: [])
15751575

1576-
return try await checkBuild(runDestination: nil, buildRequest: buildRequest, operationBuildRequest: operationBuildRequest, persistent: persistent, sourceLocation: sourceLocation, body: body, performBuild: { await $0.buildWithTimeout() })
1576+
return try await checkBuild(runDestination: nil, buildRequest: buildRequest, operationBuildRequest: operationBuildRequest, persistent: persistent, sourceLocation: sourceLocation, body: body, performBuild: { try await $0.buildWithTimeout() })
15771577
}
15781578

15791579
package struct BuildGraphResult: Sendable {
@@ -2209,8 +2209,8 @@ private let buildSystemOperationQueue = AsyncOperationQueue(concurrentTasks: 6)
22092209

22102210
extension BuildSystemOperation {
22112211
/// Runs the build system operation -- responds to cooperative cancellation and limited to 6 concurrent operations per process.
2212-
func buildWithTimeout() async {
2213-
await buildSystemOperationQueue.withOperation {
2212+
func buildWithTimeout() async throws {
2213+
try await buildSystemOperationQueue.withOperation {
22142214
do {
22152215
try await withTimeout(timeout: .seconds(1200), description: "Build system operation 20-minute limit") {
22162216
await withTaskCancellationHandler {

Sources/SWBUtil/AsyncOperationQueue.swift

Lines changed: 178 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,52 +10,204 @@
1010
//
1111
//===----------------------------------------------------------------------===//
1212

13-
public actor AsyncOperationQueue {
13+
import Foundation
14+
15+
/// A queue for running async operations with a limit on the number of concurrent tasks.
16+
public final class AsyncOperationQueue: @unchecked Sendable {
17+
18+
// This implementation is identical to the AsyncOperationQueue in swift-package-manager.
19+
// Any modifications made here should also be made there.
20+
// https://github.com/swiftlang/swift-build/blob/main/Sources/SWBUtil/AsyncOperationQueue.swift#L13
21+
22+
fileprivate typealias ID = UUID
23+
fileprivate typealias WaitingContinuation = CheckedContinuation<Void, any Error>
24+
1425
private let concurrentTasks: Int
15-
private var activeTasks: Int = 0
16-
private var waitingTasks: [CheckedContinuation<Void, Never>] = []
26+
private var waitingTasks: [WorkTask] = []
27+
private let waitingTasksLock = NSLock()
28+
29+
fileprivate enum WorkTask {
30+
case creating(ID)
31+
case waiting(ID, WaitingContinuation)
32+
case running(ID)
33+
case cancelled(ID)
34+
35+
var id: ID {
36+
switch self {
37+
case .creating(let id), .waiting(let id, _), .running(let id), .cancelled(let id):
38+
return id
39+
}
40+
}
41+
42+
var continuation: WaitingContinuation? {
43+
guard case .waiting(_, let continuation) = self else {
44+
return nil
45+
}
46+
return continuation
47+
}
48+
}
1749

50+
/// Creates an `AsyncOperationQueue` with a specified number of concurrent tasks.
51+
/// - Parameter concurrentTasks: The maximum number of concurrent tasks that can be executed concurrently.
1852
public init(concurrentTasks: Int) {
1953
self.concurrentTasks = concurrentTasks
2054
}
2155

2256
deinit {
23-
if !waitingTasks.isEmpty {
24-
preconditionFailure("Deallocated with waiting tasks")
57+
waitingTasksLock.withLock {
58+
if !waitingTasks.isEmpty {
59+
preconditionFailure("Deallocated with waiting tasks")
60+
}
2561
}
2662
}
2763

64+
/// Executes an asynchronous operation, ensuring that the number of concurrent tasks
65+
// does not exceed the specified limit.
66+
/// - Parameter operation: The asynchronous operation to execute.
67+
/// - Returns: The result of the operation.
68+
/// - Throws: An error thrown by the operation, or a `CancellationError` if the operation is cancelled.
2869
public func withOperation<ReturnValue>(
29-
_ operation: @Sendable () async -> sending ReturnValue
30-
) async -> ReturnValue {
31-
await waitIfNeeded()
32-
defer { signalCompletion() }
33-
return await operation()
34-
}
35-
36-
public func withOperation<ReturnValue>(
37-
_ operation: @Sendable () async throws -> sending ReturnValue
70+
_ operation: () async throws -> sending ReturnValue
3871
) async throws -> ReturnValue {
39-
await waitIfNeeded()
40-
defer { signalCompletion() }
72+
let taskId = try await waitIfNeeded()
73+
defer { signalCompletion(taskId) }
4174
return try await operation()
4275
}
4376

44-
private func waitIfNeeded() async {
45-
if activeTasks >= concurrentTasks {
46-
await withCheckedContinuation { continuation in
47-
waitingTasks.append(continuation)
48-
}
77+
private func waitIfNeeded() async throws -> ID {
78+
let workTask = waitingTasksLock.withLock({
79+
let shouldWait = waitingTasks.count >= concurrentTasks
80+
let workTask = shouldWait ? WorkTask.creating(ID()) : .running(ID())
81+
waitingTasks.append(workTask)
82+
return workTask
83+
})
84+
85+
// If we aren't creating a task that needs to wait, we're under the concurrency limit.
86+
guard case .creating(let taskId) = workTask else {
87+
return workTask.id
4988
}
5089

51-
activeTasks += 1
90+
enum TaskAction {
91+
case start(WaitingContinuation)
92+
case cancel(WaitingContinuation)
93+
}
94+
95+
try await withTaskCancellationHandler {
96+
try await withCheckedThrowingContinuation { (continuation: WaitingContinuation) -> Void in
97+
let action: TaskAction? = waitingTasksLock.withLock {
98+
guard let index = waitingTasks.firstIndex(where: { $0.id == taskId }) else {
99+
// The task may have been marked as cancelled already and then removed from
100+
// waitingTasks in `signalCompletion`.
101+
return .cancel(continuation)
102+
}
103+
104+
switch waitingTasks[index] {
105+
case .cancelled:
106+
// If the task was cancelled in between creating the task cancellation handler and acquiring the lock,
107+
// we should resume the continuation with a `CancellationError`.
108+
waitingTasks.remove(at: index)
109+
return .cancel(continuation)
110+
case .creating, .running, .waiting:
111+
// A task may have completed since we initially checked if we should wait. Check again in this locked
112+
// section and if we can start it, remove it from the waiting tasks and start it immediately.
113+
if waitingTasks.count >= concurrentTasks {
114+
waitingTasks[index] = .waiting(taskId, continuation)
115+
return nil
116+
} else {
117+
waitingTasks.remove(at: index)
118+
return .start(continuation)
119+
}
120+
}
121+
}
122+
123+
switch action {
124+
case .some(.cancel(let continuation)):
125+
continuation.resume(throwing: _Concurrency.CancellationError())
126+
case .some(.start(let continuation)):
127+
continuation.resume()
128+
case .none:
129+
return
130+
}
131+
}
132+
} onCancel: {
133+
let continuation: WaitingContinuation? = self.waitingTasksLock.withLock {
134+
guard let taskIndex = self.waitingTasks.firstIndex(where: { $0.id == taskId }) else {
135+
return nil
136+
}
137+
138+
switch self.waitingTasks[taskIndex] {
139+
case .waiting(_, let continuation):
140+
self.waitingTasks.remove(at: taskIndex)
141+
142+
// If the parent task is cancelled then we need to manually handle resuming the
143+
// continuation for the waiting task with a `CancellationError`. Return the continuation
144+
// here so it can be resumed once the `waitingTasksLock` is released.
145+
return continuation
146+
case .creating, .running:
147+
// If the task was still being created, mark it as cancelled in `waitingTasks` so that
148+
// the handler for `withCheckedThrowingContinuation` can immediately cancel it.
149+
self.waitingTasks[taskIndex] = .cancelled(taskId)
150+
return nil
151+
case .cancelled:
152+
preconditionFailure("Attempting to cancel a task that was already cancelled")
153+
}
154+
}
155+
156+
continuation?.resume(throwing: _Concurrency.CancellationError())
157+
}
158+
return workTask.id
52159
}
53160

54-
private func signalCompletion() {
55-
activeTasks -= 1
161+
private func signalCompletion(_ taskId: ID) {
162+
let continuationToResume = waitingTasksLock.withLock { () -> WaitingContinuation? in
163+
guard !waitingTasks.isEmpty else {
164+
return nil
165+
}
56166

57-
if let continuation = waitingTasks.popLast() {
58-
continuation.resume()
167+
// Remove the completed task from the list to decrement the active task count.
168+
if let taskIndex = self.waitingTasks.firstIndex(where: { $0.id == taskId }) {
169+
waitingTasks.remove(at: taskIndex)
170+
}
171+
172+
// We cannot remove elements from `waitingTasks` while iterating over it, so we make
173+
// a pass to collect operations and then apply them after the loop.
174+
func createTaskListOperations() -> (CollectionDifference<WorkTask>?, WaitingContinuation?) {
175+
var changes: [CollectionDifference<WorkTask>.Change] = []
176+
for (index, task) in waitingTasks.enumerated() {
177+
switch task {
178+
case .running:
179+
// Skip tasks that are already running, looking for the first one that is waiting or creating.
180+
continue
181+
case .creating:
182+
// If the next task is in the process of being created, let the
183+
// creation code in the `withCheckedThrowingContinuation` in `waitIfNeeded`
184+
// handle starting the task.
185+
break
186+
case .waiting:
187+
// Begin the next waiting task
188+
changes.append(.remove(offset: index, element: task, associatedWith: nil))
189+
return (CollectionDifference<WorkTask>(changes), task.continuation)
190+
case .cancelled:
191+
// If the next task is cancelled, continue removing cancelled
192+
// tasks until we find one that hasn't run yet, or we exaust the list of waiting tasks.
193+
changes.append(.remove(offset: index, element: task, associatedWith: nil))
194+
continue
195+
}
196+
}
197+
return (CollectionDifference<WorkTask>(changes), nil)
198+
}
199+
200+
let (collectionOperations, continuation) = createTaskListOperations()
201+
if let operations = collectionOperations {
202+
guard let appliedDiff = waitingTasks.applying(operations) else {
203+
preconditionFailure("Failed to apply changes to waiting tasks")
204+
}
205+
waitingTasks = appliedDiff
206+
}
207+
208+
return continuation
59209
}
210+
211+
continuationToResume?.resume()
60212
}
61213
}

0 commit comments

Comments
 (0)