@@ -27,29 +27,23 @@ extension Task: AnyTask {
27
27
}
28
28
29
29
/// A type that is able to track dependencies between tasks.
30
- package protocol DependencyTracker : Sendable {
31
- /// Which tasks need to finish before a task described by `self` may start executing.
32
- /// `pendingTasks` is sorted in the order in which the tasks were enqueued to `AsyncQueue`.
33
- func dependencies( in pendingTasks: [ PendingTask < Self > ] ) -> [ PendingTask < Self > ]
30
+ package protocol DependencyTracker : Sendable , Hashable {
31
+ /// Whether the task described by `self` needs to finish executing before `other` can start executing.
32
+ func isDependency( of other: Self ) -> Bool
34
33
}
35
34
36
35
/// A dependency tracker where each task depends on every other, i.e. a serial
37
36
/// queue.
38
37
package struct Serial : DependencyTracker {
39
- package func dependencies( in pendingTasks: [ PendingTask < Self > ] ) -> [ PendingTask < Self > ] {
40
- if let lastTask = pendingTasks. last {
41
- return [ lastTask]
42
- }
43
- return [ ]
38
+ package func isDependency( of other: Serial ) -> Bool {
39
+ return true
44
40
}
45
41
}
46
42
47
- package struct PendingTask < TaskMetadata: Sendable > : Sendable {
43
+ package struct PendingTask < TaskMetadata: Sendable & Hashable > : Sendable {
48
44
/// The task that is pending.
49
45
fileprivate let task : any AnyTask
50
46
51
- package let metadata : TaskMetadata
52
-
53
47
/// A unique value used to identify the task. This allows tasks to get
54
48
/// removed from `pendingTasks` again after they finished executing.
55
49
fileprivate let id : UUID
@@ -58,23 +52,25 @@ package struct PendingTask<TaskMetadata: Sendable>: Sendable {
58
52
/// A list of pending tasks that can be sent across actor boundaries and is guarded by a lock.
59
53
///
60
54
/// - Note: Unchecked sendable because the tasks are being protected by a lock.
61
- private class PendingTasks < TaskMetadata: Sendable > : @ unchecked Sendable {
55
+ private final class PendingTasks < TaskMetadata: Sendable & Hashable > : Sendable {
62
56
/// Lock guarding `pendingTasks`.
63
57
private let lock = NSLock ( )
64
58
65
59
/// Pending tasks that have not finished execution yet.
66
60
///
67
61
/// - Important: This must only be accessed while `lock` has been acquired.
68
- private var tasks : [ PendingTask < TaskMetadata > ] = [ ]
62
+ private nonisolated ( unsafe ) var tasksByMetadata : [ TaskMetadata : [ PendingTask < TaskMetadata > ] ] = [ : ]
69
63
70
64
init ( ) {
71
65
self . lock. name = " AsyncQueue "
72
66
}
73
67
74
68
/// Capture a lock and execute the closure, which may modify the pending tasks.
75
- func withLock< T> ( _ body: ( _ pendingTasks: inout [ PendingTask < TaskMetadata > ] ) throws -> T ) rethrows -> T {
69
+ func withLock< T> (
70
+ _ body: ( _ tasksByMetadata: inout [ TaskMetadata : [ PendingTask < TaskMetadata > ] ] ) throws -> T
71
+ ) rethrows -> T {
76
72
try lock. withLock {
77
- try body ( & tasks )
73
+ try body ( & tasksByMetadata )
78
74
}
79
75
}
80
76
}
@@ -122,10 +118,29 @@ package final class AsyncQueue<TaskMetadata: DependencyTracker>: Sendable {
122
118
) -> Task < Success , any Error > {
123
119
let id = UUID ( )
124
120
125
- return pendingTasks. withLock { tasks in
121
+ return pendingTasks. withLock { tasksByMetadata in
126
122
// Build the list of tasks that need to finished execution before this one
127
123
// can be executed
128
- let dependencies = metadata. dependencies ( in: tasks)
124
+ var dependencies : [ PendingTask < TaskMetadata > ] = [ ]
125
+ for (pendingMetadata, pendingTasks) in tasksByMetadata {
126
+ guard pendingMetadata. isDependency ( of: metadata) else {
127
+ // No dependency
128
+ continue
129
+ }
130
+ if metadata. isDependency ( of: metadata) , let lastPendingTask = pendingTasks. last {
131
+ // This kind of task depends on all other tasks of the same kind finishing. It is sufficient to just wait on
132
+ // the last task with this metadata, it will have all the other tasks with the same metadata as transitive
133
+ // dependencies.
134
+ dependencies. append ( lastPendingTask)
135
+ } else {
136
+ // We depend on tasks with this metadata, but they don't have any dependencies between them, eg.
137
+ // `documentUpdate` depends on all `documentRequest` but `documentRequest` don't have dependencies between
138
+ // them. We need to depend on all of them unless we knew that we depended on some other task that already
139
+ // depends on all of these. But determining that would also require knowledge about the entire dependency
140
+ // graph, which is likely as expensive as depending on all of these tasks.
141
+ dependencies += pendingTasks
142
+ }
143
+ }
129
144
130
145
// Schedule the task.
131
146
let task = Task ( priority: priority) { [ pendingTasks] in
@@ -139,14 +154,17 @@ package final class AsyncQueue<TaskMetadata: DependencyTracker>: Sendable {
139
154
140
155
let result = try await operation ( )
141
156
142
- pendingTasks. withLock { tasks in
143
- tasks. removeAll ( where: { $0. id == id } )
157
+ pendingTasks. withLock { tasksByMetadata in
158
+ tasksByMetadata [ metadata, default: [ ] ] . removeAll ( where: { $0. id == id } )
159
+ if tasksByMetadata [ metadata] ? . isEmpty ?? false {
160
+ tasksByMetadata [ metadata] = nil
161
+ }
144
162
}
145
163
146
164
return result
147
165
}
148
166
149
- tasks . append ( PendingTask ( task: task, metadata : metadata , id: id) )
167
+ tasksByMetadata [ metadata , default : [ ] ] . append ( PendingTask ( task: task, id: id) )
150
168
151
169
return task
152
170
}
0 commit comments