Skip to content

Commit b92665d

Browse files
committed
feat: add cancellation source for controlling multiple tasks cooperative cancellation
1 parent 80ae8b0 commit b92665d

File tree

4 files changed

+310
-2
lines changed

4 files changed

+310
-2
lines changed

Sources/AsyncObjects/AsyncEvent.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public actor AsyncEvent: AsyncObject {
7878
Task { [weak self] in
7979
await self?.removeContinuation(withKey: key)
8080
}
81-
}, { [weak self] (continuation: Continuation) in
81+
},
82+
{ [weak self] (continuation: Continuation) in
8283
Task { [weak self] in
8384
await self?.addContinuation(continuation, withKey: key)
8485
}

Sources/AsyncObjects/AsyncObjects.docc/AsyncObjects.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ Several synchronization primitives introduced to aid in modern swift concurrency
1212

1313
- ``AsyncSemaphore``
1414
- ``AsyncEvent``
15+
16+
### Tasks Control
17+
18+
- ``CancellationSource``

Sources/AsyncObjects/AsyncSemaphore.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public actor AsyncSemaphore: AsyncObject {
9393
Task { [weak self] in
9494
await self?.removeContinuation(withKey: key)
9595
}
96-
}, { [weak self] (continuation: Continuation) in
96+
},
97+
{ [weak self] (continuation: Continuation) in
9798
Task { [weak self] in
9899
await self?.addContinuation(continuation, withKey: key)
99100
}
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
import Foundation
2+
3+
/// An object that controls cooperative cancellation of multiple registered tasks and linked object registered tasks.
4+
///
5+
/// An async event suspends tasks if current state is non-signaled and resumes execution when event is signaled.
6+
///
7+
/// You can register tasks for cancellation using the ``register(task:)`` method
8+
/// and link with additional sources by creating object with ``init(linkedWith:)`` method.
9+
/// By calling the ``cancel()`` method all the reigistered tasks will be cancelled
10+
/// and the cancellation event will be propagated to linked cancellation sources,
11+
/// which in turn cancels their rigistered tasks and further propagates cancellation.
12+
///
13+
/// - Warning: Cancellation sources propagate cancellation event to other linked cancellation sources.
14+
/// In case of circular dependency between cancellation sources, app will go into infinite recursion.
15+
public actor CancellationSource {
16+
/// All the rigistered tasks for cooperative cancellation.
17+
private var registeredTasks: [AnyHashable: () -> Void] = [:]
18+
/// All the linked cancellation sources that cancellation event will be propagated.
19+
///
20+
/// - TODO: Store weak reference for cancellation sources.
21+
/// ```swift
22+
/// private var linkedSources: NSHashTable<CancellationSource> = .weakObjects()
23+
/// ```
24+
private var linkedSources: [CancellationSource] = []
25+
26+
/// Add task to registered cooperative cancellation tasks list.
27+
///
28+
/// - Parameter task: The task to register.
29+
@inline(__always)
30+
private func add<Success, Faliure>(task: Task<Success, Faliure>) {
31+
guard !task.isCancelled else { return }
32+
registeredTasks[task] = { task.cancel() }
33+
}
34+
35+
/// Remove task from registered cooperative cancellation tasks list.
36+
///
37+
/// - Parameter task: The task to remove.
38+
@inline(__always)
39+
private func remove<Success, Faliure>(task: Task<Success, Faliure>) {
40+
registeredTasks.removeValue(forKey: task)
41+
}
42+
43+
/// Add cancellation source to linked cancellation sources list to propagate cancellation event.
44+
///
45+
/// - Parameter task: The source to link.
46+
@inline(__always)
47+
private func addSource(_ source: CancellationSource) {
48+
linkedSources.append(source)
49+
}
50+
51+
/// Creates a new cancellation source object.
52+
public init() { }
53+
54+
/// Creates a new cancellation source object linking to all the provided cancellation sources.
55+
///
56+
/// Initiating cancellation in any of the provided cancellation sources
57+
/// will ensure newly created cancellation source recieve cancellation event.
58+
///
59+
/// - Parameter sources: The cancellation sources the newly created object will be linked to.
60+
public init(linkedWith sources: [CancellationSource]) async {
61+
await withTaskGroup(of: Void.self) { group in
62+
sources.forEach { source in
63+
group.addTask { await source.addSource(self) }
64+
}
65+
await group.waitForAll()
66+
}
67+
}
68+
69+
/// Creates a new cancellation source object linking to all the provided cancellation sources.
70+
///
71+
/// Initiating cancellation in any of the provided cancellation sources
72+
/// will ensure newly created cancellation source recieve cancellation event.
73+
///
74+
/// - Parameter sources: The cancellation sources the newly created object will be linked to.
75+
public convenience init(linkedWith sources: CancellationSource...) async {
76+
await self.init(linkedWith: sources)
77+
}
78+
79+
/// Creates a new cancellation source object
80+
/// and triggers cancellation event on this object after specified timeout.
81+
///
82+
/// - Parameter nanoseconds: The delay after which cancellation event triggered.
83+
public convenience init(cancelAfterNanoseconds nanoseconds: UInt64) {
84+
self.init()
85+
Task { [weak self] in
86+
try await self?.cancel(afterNanoseconds: nanoseconds)
87+
}
88+
}
89+
90+
/// Register task for cooperative cancellation when cancellation event recieved on cancellation source.
91+
///
92+
/// If task completes before cancellation event is triggered, it is automatically unregistered.
93+
///
94+
/// - Parameter task: The task to register.
95+
@Sendable
96+
public func register<Success, Faliure>(task: Task<Success, Faliure>) {
97+
add(task: task)
98+
Task { [weak self] in
99+
let _ = await task.result
100+
await self?.remove(task: task)
101+
}
102+
}
103+
104+
/// Trigger cancellation event, initiate cooperative cancellation of registered tasks
105+
/// and propagate cancellation to linked cancellation sources.
106+
@Sendable
107+
public func cancel() async {
108+
registeredTasks.forEach { $1() }
109+
registeredTasks = [:]
110+
await withTaskGroup(of: Void.self) { group in
111+
linkedSources.forEach { group.addTask(operation: $0.cancel) }
112+
await group.waitForAll()
113+
}
114+
}
115+
116+
/// Trigger cancellation event after provided delay,
117+
/// initiate cooperative cancellation of registered tasks
118+
/// and propagate cancellation to linked cancellation sources.
119+
///
120+
/// - Parameter nanoseconds: The delay after which cancellation event triggered.
121+
@Sendable
122+
public func cancel(afterNanoseconds nanoseconds: UInt64) async throws {
123+
try await Task.sleep(nanoseconds: nanoseconds)
124+
await cancel()
125+
}
126+
}
127+
128+
public extension Task {
129+
/// Runs the given nonthrowing operation asynchronously as part of a new task on behalf of the current actor,
130+
/// with the provided cancellation source controlling cooperative cancellation.
131+
///
132+
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
133+
/// In the event of cancellation child task is cancelled, while returning the value in the returned task.
134+
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
135+
///
136+
/// - Parameters:
137+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
138+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
139+
/// - operation: The operation to perform.
140+
@discardableResult
141+
init(
142+
priority: TaskPriority? = nil,
143+
cancellationSource: CancellationSource,
144+
operation: @escaping @Sendable () async -> Success
145+
) where Failure == Never {
146+
self.init(priority: priority) {
147+
let task = Self.init(priority: priority, operation: operation)
148+
await cancellationSource.register(task: task)
149+
return await task.value
150+
}
151+
}
152+
153+
/// Runs the given throwing operation asynchronously as part of a new task on behalf of the current actor,
154+
/// with the provided cancellation source controlling cooperative cancellation.
155+
///
156+
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
157+
/// In the event of cancellation child task is cancelled, while propagating error in the returned task.
158+
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
159+
///
160+
/// - Parameters:
161+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
162+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
163+
/// - operation: The operation to perform.
164+
@discardableResult
165+
init(
166+
priority: TaskPriority? = nil,
167+
cancellationSource: CancellationSource,
168+
operation: @escaping @Sendable () async throws -> Success
169+
) rethrows where Failure == Error {
170+
self.init(priority: priority) {
171+
let task = Self.init(priority: priority, operation: operation)
172+
await cancellationSource.register(task: task)
173+
return try await task.value
174+
}
175+
}
176+
177+
/// Runs the given nonthrowing operation asynchronously as part of a new task,
178+
/// with the provided cancellation source controlling cooperative cancellation.
179+
///
180+
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
181+
/// In the event of cancellation child task is cancelled, while returning the value in the returned task.
182+
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
183+
///
184+
/// - Parameters:
185+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
186+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
187+
/// - operation: The operation to perform.
188+
@discardableResult
189+
static func detached(
190+
priority: TaskPriority? = nil,
191+
cancellationSource: CancellationSource,
192+
operation: @escaping @Sendable () async -> Success
193+
) -> Self where Failure == Never {
194+
return Task.detached(priority: priority) {
195+
let task = Self.init(priority: priority, operation: operation)
196+
await cancellationSource.register(task: task)
197+
return await task.value
198+
}
199+
}
200+
201+
/// Runs the given throwing operation asynchronously as part of a new task,
202+
/// with the provided cancellation source controlling cooperative cancellation.
203+
///
204+
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
205+
/// In the event of cancellation child task is cancelled, while returning the value in the returned task.
206+
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
207+
///
208+
/// - Parameters:
209+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
210+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
211+
/// - operation: The operation to perform.
212+
@discardableResult
213+
static func detached(
214+
priority: TaskPriority? = nil,
215+
cancellationSource: CancellationSource,
216+
operation: @escaping @Sendable () async throws -> Success
217+
) rethrows -> Self where Failure == Error {
218+
return Task.detached(priority: priority) {
219+
let task = Self.init(priority: priority, operation: operation)
220+
await cancellationSource.register(task: task)
221+
return try await task.value
222+
}
223+
}
224+
225+
/// Runs the given nonthrowing operation asynchronously as part of a new top-level task on behalf of the current actor,
226+
/// with the provided cancellation source controlling cooperative cancellation.
227+
///
228+
/// The created task will be cancelled when cancellation event triggered on the provided cancellation source.
229+
///
230+
/// - Parameters:
231+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
232+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
233+
/// - operation: The operation to perform.
234+
@discardableResult
235+
init(
236+
priority: TaskPriority? = nil,
237+
cancellationSource: CancellationSource,
238+
operation: @escaping @Sendable () async -> Success
239+
) async where Failure == Never {
240+
self.init(priority: priority, operation: operation)
241+
await cancellationSource.register(task: self)
242+
}
243+
244+
/// Runs the given throwing operation asynchronously as part of a new top-level task on behalf of the current actor,
245+
/// with the provided cancellation source controlling cooperative cancellation.
246+
///
247+
/// The created task will be cancelled when cancellation event triggered on the provided cancellation source.
248+
///
249+
/// - Parameters:
250+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
251+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
252+
/// - operation: The operation to perform.
253+
@discardableResult
254+
init(
255+
priority: TaskPriority? = nil,
256+
cancellationSource: CancellationSource,
257+
operation: @escaping @Sendable () async throws -> Success
258+
) async rethrows where Failure == Error {
259+
self.init(priority: priority, operation: operation)
260+
await cancellationSource.register(task: self)
261+
}
262+
263+
/// Runs the given nonthrowing operation asynchronously as part of a new top-level task,
264+
/// with the provided cancellation source controlling cooperative cancellation.
265+
///
266+
/// The created task will be cancelled when cancellation event triggered on the provided cancellation source.
267+
///
268+
/// - Parameters:
269+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
270+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
271+
/// - operation: The operation to perform.
272+
@discardableResult
273+
static func detached(
274+
priority: TaskPriority? = nil,
275+
cancellationSource: CancellationSource,
276+
operation: @escaping @Sendable () async -> Success
277+
) async -> Self where Failure == Never {
278+
let task = Task.detached(priority: priority, operation: operation)
279+
await cancellationSource.register(task: task)
280+
return task
281+
}
282+
283+
/// Runs the given throwing operation asynchronously as part of a new top-level task,
284+
/// with the provided cancellation source controlling cooperative cancellation.
285+
///
286+
/// The created task will be cancelled when cancellation event triggered on the provided cancellation source.
287+
///
288+
/// - Parameters:
289+
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
290+
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
291+
/// - operation: The operation to perform.
292+
@discardableResult
293+
static func detached(
294+
priority: TaskPriority? = nil,
295+
cancellationSource: CancellationSource,
296+
operation: @escaping @Sendable () async throws -> Success
297+
) async rethrows -> Self where Failure == Error {
298+
let task = Task.detached(priority: priority, operation: operation)
299+
await cancellationSource.register(task: task)
300+
return task
301+
}
302+
}

0 commit comments

Comments
 (0)