Skip to content

Commit 51b302e

Browse files
committed
feat: add operation type to bridge GCD/libdispatch with structured concurrency
1 parent 3e81653 commit 51b302e

File tree

11 files changed

+680
-455
lines changed

11 files changed

+680
-455
lines changed

AsyncObjects.xcodeproj/project.pbxproj

Lines changed: 349 additions & 333 deletions
Large diffs are not rendered by default.

Sources/AsyncObjects/AsyncEvent.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public actor AsyncEvent: AsyncObject {
4040
/// By default, event is initially in signaled state.
4141
///
4242
/// - Parameter signaled: The signal state for event.
43+
///
4344
/// - Returns: The newly created event.
4445
public init(signaledInitially signaled: Bool = true) {
4546
self.signaled = signaled

Sources/AsyncObjects/AsyncObject.swift

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ public extension AsyncObject where Self: AnyObject {
5757
@Sendable
5858
func wait(forNanoseconds duration: UInt64) async -> TaskTimeoutResult {
5959
return await waitForTaskCompletion(
60-
task: { [weak self] in await self?.wait() },
6160
withTimeoutInNanoseconds: duration
62-
)
61+
) { [weak self] in
62+
await self?.wait()
63+
}
6364
}
6465
}
6566

@@ -101,10 +102,9 @@ public func waitForAll(
101102
_ objects: [any AsyncObject],
102103
forNanoseconds duration: UInt64
103104
) async -> TaskTimeoutResult {
104-
return await waitForTaskCompletion(
105-
task: { await waitForAll(objects) },
106-
withTimeoutInNanoseconds: duration
107-
)
105+
return await waitForTaskCompletion(withTimeoutInNanoseconds: duration) {
106+
await waitForAll(objects)
107+
}
108108
}
109109

110110
/// Waits for multiple objects to green light task execution
@@ -165,10 +165,9 @@ public func waitForAny(
165165
_ objects: [any AsyncObject],
166166
forNanoseconds duration: UInt64
167167
) async -> TaskTimeoutResult {
168-
return await waitForTaskCompletion(
169-
task: { await waitForAny(objects) },
170-
withTimeoutInNanoseconds: duration
171-
)
168+
return await waitForTaskCompletion(withTimeoutInNanoseconds: duration) {
169+
await waitForAny(objects)
170+
}
172171
}
173172

174173
/// Waits for multiple objects to green light task execution
@@ -201,8 +200,8 @@ public func waitForAny(
201200
/// or timed out.
202201
@inlinable
203202
public func waitForTaskCompletion(
204-
task: @escaping @Sendable () async -> Void,
205-
withTimeoutInNanoseconds timeout: UInt64
203+
withTimeoutInNanoseconds timeout: UInt64,
204+
_ task: @escaping @Sendable () async -> Void
206205
) async -> TaskTimeoutResult {
207206
var timedOut = true
208207
await withTaskGroup(of: Bool.self) { group in

Sources/AsyncObjects/AsyncObjects.docc/AsyncObjects.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ Several synchronization primitives introduced to aid in modern swift concurrency
1616
### Tasks Control
1717

1818
- ``CancellationSource``
19+
- ``TaskOperation``

Sources/AsyncObjects/AsyncSemaphore.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public actor AsyncSemaphore: AsyncObject {
5858
/// Passing a value greater than zero is useful for managing a finite pool of resources, where the pool size is equal to the value.
5959
///
6060
/// - Parameter count: The starting value for the semaphore.
61+
///
6162
/// - Returns: The newly created semaphore.
6263
public init(value count: UInt = 0) {
6364
self.limit = count + 1

Sources/AsyncObjects/CancellationSource.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public actor CancellationSource {
7272
/// will ensure newly created cancellation source recieve cancellation event.
7373
///
7474
/// - Parameter sources: The cancellation sources the newly created object will be linked to.
75+
///
76+
/// - Returns: The newly created cancellation source.
7577
public convenience init(linkedWith sources: CancellationSource...) async {
7678
await self.init(linkedWith: sources)
7779
}
@@ -80,6 +82,8 @@ public actor CancellationSource {
8082
/// and triggers cancellation event on this object after specified timeout.
8183
///
8284
/// - Parameter nanoseconds: The delay after which cancellation event triggered.
85+
///
86+
/// - Returns: The newly created cancellation source.
8387
public convenience init(cancelAfterNanoseconds nanoseconds: UInt64) {
8488
self.init()
8589
Task { [weak self] in
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
import Foundation
2+
import Dispatch
3+
4+
/// An object that bridges asynchronous work under structured concurrency
5+
/// to Grand Central Dispatch (GCD or `libdispatch`) as `Operation`.
6+
///
7+
/// Using this object traditional `libdispatch` APIs can be used along with structured concurrency
8+
/// making concurrent task management flexible in terms of managing dependencies.
9+
///
10+
/// You can start the operation by adding it to an `OperationQueue`,
11+
/// or by manually calling the ``signal()`` or ``start()`` method.
12+
/// Wait for operation completion asynchronously by calling ``wait()`` method
13+
/// or its timeout variation ``wait(forNanoseconds:)``.
14+
public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
15+
@unchecked Sendable
16+
{
17+
/// The dispatch queue used to synchronize data access and modifications.
18+
private weak var propQueue: DispatchQueue!
19+
/// The asynchronous action to perform as part of the operation..
20+
private let underlyingAction: @Sendable () async throws -> R
21+
/// The top-level task that executes asynchronous action provided
22+
/// on behalf of the actor where operation started.
23+
private var execTask: Task<R, Error>?
24+
25+
/// A Boolean value indicating whether the operation executes its task asynchronously.
26+
///
27+
/// Always returns true, since the operation always executes its task asynchronously.
28+
public override var isConcurrent: Bool { true }
29+
/// A Boolean value indicating whether the operation executes its task asynchronously.
30+
///
31+
/// Always returns true, since the operation always executes its task asynchronously.
32+
public override var isAsynchronous: Bool { true }
33+
/// A Boolean value indicating whether the operation has been cancelled.
34+
///
35+
/// Returns whether the underlying top-level task is cancelled or not.
36+
/// The default value of this property is `false`.
37+
/// Calling the ``cancel()`` method of this object sets the value of this property to `true`.
38+
public override var isCancelled: Bool { execTask?.isCancelled ?? false }
39+
40+
/// Private store for boolean value indicating whether the operation is currently executing.
41+
private var _isExecuting: Bool = false
42+
/// A Boolean value indicating whether the operation is currently executing.
43+
///
44+
/// The value of this property is true if the operation is currently executing
45+
/// provided asynchronous operation or false if it is not.
46+
public override private(set) var isExecuting: Bool {
47+
get { propQueue.sync { _isExecuting } }
48+
set {
49+
willChangeValue(forKey: "isExecuting")
50+
propQueue.sync(flags: [.barrier]) { _isExecuting = newValue }
51+
didChangeValue(forKey: "isExecuting")
52+
}
53+
}
54+
55+
/// Private store for boolean value indicating whether the operation has finished executing its task.
56+
private var _isFinished: Bool = false
57+
/// A Boolean value indicating whether the operation has finished executing its task.
58+
///
59+
/// The value of this property is true if the operation is finished executing or cancelled
60+
/// provided asynchronous operation or false if it is not.
61+
public override private(set) var isFinished: Bool {
62+
get { propQueue.sync { _isFinished } }
63+
set {
64+
willChangeValue(forKey: "isFinished")
65+
propQueue.sync(flags: [.barrier]) {
66+
_isFinished = newValue
67+
guard newValue, !continuations.isEmpty else { return }
68+
continuations.forEach { $0.value.resume() }
69+
continuations = [:]
70+
}
71+
didChangeValue(forKey: "isFinished")
72+
}
73+
}
74+
75+
/// The result of provided asynchronous operation execution.
76+
///
77+
/// Will be success if provided operation completed successfully,
78+
/// or failure returned with error.
79+
public var result: Result<R, Error> {
80+
get async { (await execTask?.result) ?? .failure(CancellationError()) }
81+
}
82+
83+
/// Creates a new operation that executes the provided throwing asynchronous task.
84+
///
85+
/// The provided dispatch queue is used to syncronize operation property access and modifications
86+
/// and prevent data races.
87+
///
88+
/// - Parameters:
89+
/// - queue: The dispatch queue to be used to synchronize data access and modifications.
90+
/// - operation: The throwing asynchronous operation to execute.
91+
///
92+
/// - Returns: The newly created asynchronous operation.
93+
public init(
94+
queue: DispatchQueue,
95+
operation: @escaping @Sendable () async throws -> R
96+
) {
97+
self.propQueue = queue
98+
self.underlyingAction = operation
99+
super.init()
100+
}
101+
102+
/// Creates a new operation that executes the provided nonthrowing asynchronous task.
103+
///
104+
/// The provided dispatch queue is used to syncronize operation property access and modifications
105+
/// and prevent data races.
106+
///
107+
/// - Parameters:
108+
/// - queue: The dispatch queue to be used to synchronize data access and modifications.
109+
/// - operation: The nonthrowing asynchronous operation to execute.
110+
///
111+
/// - Returns: The newly created asynchronous operation.
112+
public init(
113+
queue: DispatchQueue,
114+
operation: @escaping @Sendable () async -> R
115+
) {
116+
self.propQueue = queue
117+
self.underlyingAction = operation
118+
super.init()
119+
}
120+
121+
/// Begins the execution of the operation.
122+
///
123+
/// Updates the execution state of the operation and
124+
/// runs the given operation asynchronously
125+
/// as part of a new top-level task on behalf of the current actor.
126+
public override func start() {
127+
guard !self.isFinished else { return }
128+
isFinished = false
129+
isExecuting = true
130+
main()
131+
}
132+
133+
/// Performs the provided asynchronous task.
134+
///
135+
/// Runs the given operation asynchronously
136+
/// as part of a new top-level task on behalf of the current actor.
137+
public override func main() {
138+
guard isExecuting, execTask == nil else { return }
139+
execTask = Task { [weak self] in
140+
guard let self = self else { throw CancellationError() }
141+
let result = try await underlyingAction()
142+
self.finish()
143+
return result
144+
}
145+
}
146+
147+
/// Advises the operation object that it should stop executing its task.
148+
///
149+
/// Initiates cooperative cancellation for provided asynchronous operation
150+
/// and moves to finshed state.
151+
///
152+
/// Calling this method on a task that doesn’t support cancellation has no effect.
153+
/// Likewise, if the task has already run past the last point where it would stop early,
154+
/// calling this method has no effect.
155+
public override func cancel() {
156+
execTask?.cancel()
157+
finish()
158+
}
159+
160+
/// Moves this operation to finished state.
161+
///
162+
/// Must be called either when operation completes or cancelled.
163+
@inline(__always)
164+
private func finish() {
165+
isExecuting = false
166+
isFinished = true
167+
}
168+
169+
// MARK: AsyncObject Impl
170+
/// The suspended tasks continuation type.
171+
private typealias Continuation = UnsafeContinuation<Void, Error>
172+
/// The continuations stored with an associated key for all the suspended task that are waitig for opearation completion.
173+
private var continuations: [UUID: Continuation] = [:]
174+
175+
/// Add continuation with the provided key in `continuations` map.
176+
///
177+
/// - Parameters:
178+
/// - continuation: The `continuation` to add.
179+
/// - key: The key in the map.
180+
@inline(__always)
181+
private func addContinuation(
182+
_ continuation: Continuation,
183+
withKey key: UUID
184+
) {
185+
continuations[key] = continuation
186+
}
187+
188+
/// Remove continuation associated with provided key
189+
/// from `continuations` map.
190+
///
191+
/// - Parameter key: The key in the map.
192+
@inline(__always)
193+
private func removeContinuation(withKey key: UUID) {
194+
continuations.removeValue(forKey: key)
195+
}
196+
197+
/// Starts operation asynchronously
198+
/// as part of a new top-level task on behalf of the current actor.
199+
@Sendable
200+
public func signal() {
201+
self.start()
202+
}
203+
204+
/// Waits for opearation to complete successfully or cancelled.
205+
///
206+
/// Only waits asynchronously, if opearation is executing,
207+
/// until it is completed or cancelled.
208+
@Sendable
209+
public func wait() async {
210+
guard !isFinished else { return }
211+
let key = UUID()
212+
try? await withUnsafeThrowingContinuationCancellationHandler(
213+
handler: { [weak self] (continuation: Continuation) in
214+
Task { [weak self] in
215+
self?.removeContinuation(withKey: key)
216+
}
217+
},
218+
{ [weak self] (continuation: Continuation) in
219+
Task { [weak self] in
220+
self?.addContinuation(continuation, withKey: key)
221+
}
222+
}
223+
)
224+
}
225+
}

Tests/AsyncObjectsTests/AsyncEventTests.swift

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@ class AsyncEventTests: XCTestCase {
1212
try await Task.sleep(nanoseconds: interval)
1313
await event.signal()
1414
}
15-
await checkExecInterval(
16-
for: { await event.wait() },
17-
durationInSeconds: seconds
18-
)
15+
await checkExecInterval(durationInSeconds: seconds, for: event.wait)
1916
}
2017

2118
func testEventWait() async throws {
@@ -37,12 +34,9 @@ class AsyncEventTests: XCTestCase {
3734
func testEventWaitWithTimeout() async throws {
3835
let event = AsyncEvent(signaledInitially: false)
3936
var result: TaskTimeoutResult = .success
40-
await checkExecInterval(
41-
for: {
42-
result = await event.wait(forNanoseconds: UInt64(4E9))
43-
},
44-
durationInSeconds: 4
45-
)
37+
await checkExecInterval(durationInSeconds: 4) {
38+
result = await event.wait(forNanoseconds: UInt64(4E9))
39+
}
4640
XCTAssertEqual(result, .timedOut)
4741
}
4842

@@ -53,24 +47,18 @@ class AsyncEventTests: XCTestCase {
5347
try await Task.sleep(nanoseconds: UInt64(5E9))
5448
await event.signal()
5549
}
56-
await checkExecInterval(
57-
for: {
58-
result = await event.wait(forNanoseconds: UInt64(10E9))
59-
},
60-
durationInSeconds: 5
61-
)
50+
await checkExecInterval(durationInSeconds: 5) {
51+
result = await event.wait(forNanoseconds: UInt64(10E9))
52+
}
6253
XCTAssertEqual(result, .success)
6354
}
6455

6556
func testReleasedEventWaitSuccessWithoutTimeout() async throws {
6657
let event = AsyncEvent()
6758
var result: TaskTimeoutResult = .timedOut
68-
await checkExecInterval(
69-
for: {
70-
result = await event.wait(forNanoseconds: UInt64(10E9))
71-
},
72-
durationInSeconds: 0
73-
)
59+
await checkExecInterval(durationInSeconds: 0) {
60+
result = await event.wait(forNanoseconds: UInt64(10E9))
61+
}
7462
XCTAssertEqual(result, .success)
7563
}
7664
}

0 commit comments

Comments
 (0)