Skip to content

Commit 84e4d29

Browse files
committed
feat: add task queue to run concurrent tasks and barrier tasks similar to DispatchQueue
1 parent c752519 commit 84e4d29

File tree

9 files changed

+474
-33
lines changed

9 files changed

+474
-33
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,17 @@
66
[![Swift](https://img.shields.io/badge/Swift-5.6+-orange)](https://img.shields.io/badge/Swift-5-DE5D43)
77
[![Platforms](https://img.shields.io/badge/Platforms-all-sucess)](https://img.shields.io/badge/Platforms-all-sucess)
88
[![CI/CD](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/main.yml)
9-
[![CodeQL](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml/badge.svg?event=schedule)](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml)
109
[![Maintainability](https://api.codeclimate.com/v1/badges/37183c809818826c1bcf/maintainability)](https://codeclimate.com/github/SwiftyLab/AsyncObjects/maintainability)
1110
[![codecov](https://codecov.io/gh/SwiftyLab/AsyncObjects/branch/main/graph/badge.svg?token=jKxMv5oFeA)](https://codecov.io/gh/SwiftyLab/AsyncObjects)
1211
<!-- [![CocoaPods Compatible](https://img.shields.io/cocoapods/v/AsyncObjects.svg?label=CocoaPods&color=C90005)](https://badge.fury.io/co/AsyncObjects) -->
12+
<!-- [![CodeQL](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml/badge.svg?event=schedule)](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml) -->
1313

14-
Several synchronization primitives introduced to aid in modern swift concurrency. The primitives are very similar to those used in other operating systems including mutexes, condition variables, shared/exclusive locks, and semaphores.
14+
Several synchronization primitives and task synchronization mechanisms introduced to aid in modern swift concurrency.
15+
16+
## Overview
17+
18+
While Swift's modern structured concurrency provides safer way of managing concurrency, it lacks many synchronization and task management features in its current state. **AsyncObjects** aims to close the functionality gap by providing following features:
19+
20+
- Easier task cancellation with ``CancellationSource``.
21+
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore`` and ``AsyncEvent``.
22+
- Bridging with Grand Central Dispatch and allowing usage of GCD specific patterns with ``TaskOperation`` and ``TaskQueue``.

Sources/AsyncObjects/AsyncEvent.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ public actor AsyncEvent: AsyncObject {
7575
guard !signaled else { return }
7676
let key = UUID()
7777
try? await withUnsafeThrowingContinuationCancellationHandler(
78-
handler: { [weak self] (continuation: Continuation) in
78+
handler: { [weak self] continuation in
7979
Task { [weak self] in
8080
await self?.removeContinuation(withKey: key)
8181
}
8282
},
83-
{ [weak self] (continuation: Continuation) in
83+
{ [weak self] continuation in
8484
Task { [weak self] in
8585
await self?.addContinuation(continuation, withKey: key)
8686
}
Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
# ``AsyncObjects``
22

3-
Several synchronization primitives introduced to aid in modern swift concurrency. The primitives are very similar to those used in other operating systems including mutexes, condition variables, shared/exclusive locks, and semaphores.
3+
Several synchronization primitives and task synchronization mechanisms introduced to aid in modern swift concurrency.
44

55
## Overview
66

7-
Several synchronization primitives introduced to aid in modern swift concurrency. The primitives are very similar to those used in other operating systems including mutexes, condition variables, shared/exclusive locks, and semaphores.
7+
While Swift's modern structured concurrency provides safer way of managing concurrency, it lacks many synchronization and task management features in its current state. **AsyncObjects** aims to close the functionality gap by providing following features:
8+
9+
- Easier task cancellation with ``CancellationSource``.
10+
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore`` and ``AsyncEvent``.
11+
- Bridging with Grand Central Dispatch and allowing usage of GCD specific patterns with ``TaskOperation`` and ``TaskQueue``.
812

913
## Topics
1014

@@ -13,7 +17,8 @@ Several synchronization primitives introduced to aid in modern swift concurrency
1317
- ``AsyncSemaphore``
1418
- ``AsyncEvent``
1519

16-
### Tasks Control
20+
### Tasks Synchronization
1721

1822
- ``CancellationSource``
1923
- ``TaskOperation``
24+
- ``TaskQueue``

Sources/AsyncObjects/AsyncSemaphore.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ public actor AsyncSemaphore: AsyncObject {
9090
if count > 0 { return }
9191
let key = UUID()
9292
try? await withUnsafeThrowingContinuationCancellationHandler(
93-
handler: { [weak self] (continuation: Continuation) in
93+
handler: { [weak self] continuation in
9494
Task { [weak self] in
9595
await self?.removeContinuation(withKey: key)
9696
}
9797
},
98-
{ [weak self] (continuation: Continuation) in
98+
{ [weak self] continuation in
9999
Task { [weak self] in
100100
await self?.addContinuation(continuation, withKey: key)
101101
}

Sources/AsyncObjects/CancellationSource.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public actor CancellationSource {
4949
}
5050

5151
/// Creates a new cancellation source object.
52+
///
53+
/// - Returns: The newly created cancellation source.
5254
public init() { }
5355

5456
/// Creates a new cancellation source object linking to all the provided cancellation sources.
@@ -57,6 +59,8 @@ public actor CancellationSource {
5759
/// will ensure newly created cancellation source recieve cancellation event.
5860
///
5961
/// - Parameter sources: The cancellation sources the newly created object will be linked to.
62+
///
63+
/// - Returns: The newly created cancellation source.
6064
public init(linkedWith sources: [CancellationSource]) async {
6165
await withTaskGroup(of: Void.self) { group in
6266
sources.forEach { source in
@@ -141,6 +145,8 @@ public extension Task {
141145
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
142146
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
143147
/// - operation: The operation to perform.
148+
///
149+
/// - Returns: The newly created task.
144150
@discardableResult
145151
init(
146152
priority: TaskPriority? = nil,
@@ -165,6 +171,8 @@ public extension Task {
165171
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
166172
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
167173
/// - operation: The operation to perform.
174+
///
175+
/// - Returns: The newly created task.
168176
@discardableResult
169177
init(
170178
priority: TaskPriority? = nil,
@@ -189,6 +197,8 @@ public extension Task {
189197
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
190198
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
191199
/// - operation: The operation to perform.
200+
///
201+
/// - Returns: The newly created task.
192202
@discardableResult
193203
static func detached(
194204
priority: TaskPriority? = nil,
@@ -213,6 +223,8 @@ public extension Task {
213223
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
214224
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
215225
/// - operation: The operation to perform.
226+
///
227+
/// - Returns: The newly created task.
216228
@discardableResult
217229
static func detached(
218230
priority: TaskPriority? = nil,
@@ -235,6 +247,8 @@ public extension Task {
235247
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
236248
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
237249
/// - operation: The operation to perform.
250+
///
251+
/// - Returns: The newly created task.
238252
@discardableResult
239253
init(
240254
priority: TaskPriority? = nil,
@@ -254,6 +268,8 @@ public extension Task {
254268
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
255269
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
256270
/// - operation: The operation to perform.
271+
///
272+
/// - Returns: The newly created task.
257273
@discardableResult
258274
init(
259275
priority: TaskPriority? = nil,
@@ -273,6 +289,8 @@ public extension Task {
273289
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
274290
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
275291
/// - operation: The operation to perform.
292+
///
293+
/// - Returns: The newly created task.
276294
@discardableResult
277295
static func detached(
278296
priority: TaskPriority? = nil,
@@ -293,6 +311,8 @@ public extension Task {
293311
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
294312
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
295313
/// - operation: The operation to perform.
314+
///
315+
/// - Returns: The newly created task.
296316
@discardableResult
297317
static func detached(
298318
priority: TaskPriority? = nil,

Sources/AsyncObjects/ContinuationWrapper.swift

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@
1111
/// You must not resume the continuation in closure.
1212
/// - fn: A closure that takes an `UnsafeContinuation` parameter.
1313
/// You must resume the continuation exactly once.
14+
///
1415
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
1516
/// - Returns: The value passed to the continuation.
17+
///
1618
/// - Important: The continuation provided in cancellation handler is already resumed with cancellation error.
1719
/// Trying to resume the continuation here will cause runtime error/unexpected behavior.
1820
func withUnsafeThrowingContinuationCancellationHandler<T: Sendable>(
1921
handler: @Sendable (UnsafeContinuation<T, Error>) -> Void,
2022
_ fn: (UnsafeContinuation<T, Error>) -> Void
2123
) async throws -> T {
2224
typealias Continuation = UnsafeContinuation<T, Error>
23-
let wrapper = Continuation.Wrapper()
25+
let wrapper = ContinuationWrapper<Continuation>()
2426
let value = try await withTaskCancellationHandler {
2527
guard let continuation = wrapper.value else { return }
2628
wrapper.cancel(withError: CancellationError())
@@ -36,32 +38,55 @@ func withUnsafeThrowingContinuationCancellationHandler<T: Sendable>(
3638
return value
3739
}
3840

39-
extension UnsafeContinuation {
40-
/// Wrapper type used to store `continuation` and
41-
/// provide cancellation mechanism.
42-
class Wrapper {
43-
/// The underlying continuation referenced.
44-
var value: UnsafeContinuation?
41+
/// Wrapper type used to store `continuation` and
42+
/// provide cancellation mechanism.
43+
final class ContinuationWrapper<Wrapped: Continuable> {
44+
/// The underlying continuation referenced.
45+
var value: Wrapped?
4546

46-
/// Creates a new instance with a continuation reference passed.
47-
/// By default no continuation is stored.
48-
///
49-
/// - Parameter value: A continuation reference to store.
50-
/// - Returns: The newly created continuation wrapper.
51-
init(value: UnsafeContinuation? = nil) {
52-
self.value = value
53-
}
47+
/// Creates a new instance with a continuation reference passed.
48+
/// By default no continuation is stored.
49+
///
50+
/// - Parameter value: A continuation reference to store.
51+
///
52+
/// - Returns: The newly created continuation wrapper.
53+
init(value: Wrapped? = nil) {
54+
self.value = value
55+
}
5456

55-
/// Resume continuation with passed error,
56-
/// without checking if continuation already resumed.
57-
///
58-
/// - Parameter error: Error passed to continuation.
59-
func cancel(withError error: E) {
60-
value?.resume(throwing: error)
61-
}
57+
/// Resume continuation with passed error,
58+
/// without checking if continuation already resumed.
59+
///
60+
/// - Parameter error: Error passed to continuation.
61+
func cancel(withError error: Wrapped.Failure) {
62+
value?.resume(throwing: error)
6263
}
6364
}
6465

66+
/// A type that allows to interface between synchronous and asynchronous code,
67+
/// by representing task state and allowing task resuming with some value or error.
68+
protocol Continuable: Sendable {
69+
/// The type of value to resume the continuation with in case of success.
70+
associatedtype Success
71+
/// The type of error to resume the continuation with in case of failure.
72+
associatedtype Failure: Error
73+
/// Resume the task awaiting the continuation by having it return normally from its suspension point.
74+
///
75+
/// - Parameter value: The value to return from the continuation.
76+
func resume(returning value: Success)
77+
/// Resume the task awaiting the continuation by having it throw an error from its suspension point.
78+
///
79+
/// - Parameter error: The error to throw from the continuation.
80+
func resume(throwing error: Failure)
81+
/// Resume the task awaiting the continuation by having it either return normally
82+
/// or throw an error based on the state of the given `Result` value.
83+
///
84+
/// - Parameter result: A value to either return or throw from the continuation.
85+
func resume(with result: Result<Success, Failure>)
86+
}
87+
88+
extension UnsafeContinuation: Continuable {}
89+
6590
extension UnsafeContinuation where E == Error {
6691
/// Cancel continuation by resuming with cancellation error.
6792
@inlinable

Sources/AsyncObjects/TaskOperation.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
1515
@unchecked Sendable
1616
{
1717
/// The dispatch queue used to synchronize data access and modifications.
18-
private weak var propQueue: DispatchQueue!
18+
private unowned let propQueue: DispatchQueue
1919
/// The asynchronous action to perform as part of the operation..
2020
private let underlyingAction: @Sendable () async throws -> R
2121
/// The top-level task that executes asynchronous action provided

0 commit comments

Comments
 (0)