Skip to content

Commit 3899792

Browse files
committed
refactor: use CheckedContinuation for debug mode or for ASYNCOBJECTS_USE_CHECKEDCONTINUATION flag
1 parent bbb8188 commit 3899792

File tree

6 files changed

+135
-95
lines changed

6 files changed

+135
-95
lines changed

Sources/AsyncObjects/AsyncEvent.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import Foundation
88
/// Wait for event signal by calling ``wait()`` method or its timeout variation ``wait(forNanoseconds:)``.
99
public actor AsyncEvent: AsyncObject {
1010
/// The suspended tasks continuation type.
11-
private typealias Continuation = UnsafeContinuation<Void, Error>
11+
private typealias Continuation = GlobalContinuation<Void, Error>
1212
/// The continuations stored with an associated key for all the suspended task that are waitig for event signal.
1313
private var continuations: [UUID: Continuation] = [:]
1414
/// Indicates whether current stateof event is signaled.
@@ -74,7 +74,7 @@ public actor AsyncEvent: AsyncObject {
7474
public func wait() async {
7575
guard !signaled else { return }
7676
let key = UUID()
77-
try? await withUnsafeThrowingContinuationCancellationHandler(
77+
try? await withThrowingContinuationCancellationHandler(
7878
handler: { [weak self] continuation in
7979
Task { [weak self] in
8080
await self?.removeContinuation(withKey: key)

Sources/AsyncObjects/AsyncSemaphore.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import OrderedCollections
1111
/// or its timeout variation ``wait(forNanoseconds:)``.
1212
public actor AsyncSemaphore: AsyncObject {
1313
/// The suspended tasks continuation type.
14-
private typealias Continuation = UnsafeContinuation<Void, Error>
14+
private typealias Continuation = GlobalContinuation<Void, Error>
1515
/// The continuations stored with an associated key for all the suspended task that are waitig for access to resource.
1616
private var continuations: OrderedDictionary<UUID, Continuation> = [:]
1717
/// Pool size for concurrent resource access.
@@ -89,7 +89,7 @@ public actor AsyncSemaphore: AsyncObject {
8989
count -= 1
9090
if count > 0 { return }
9191
let key = UUID()
92-
try? await withUnsafeThrowingContinuationCancellationHandler(
92+
try? await withThrowingContinuationCancellationHandler(
9393
handler: { [weak self] continuation in
9494
Task { [weak self] in
9595
await self?.removeContinuation(withKey: key)

Sources/AsyncObjects/ContinuationWrapper.swift

Lines changed: 86 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,35 @@
1-
/// Suspends the current task, then calls the given closure with an unsafe throwing continuation for the current task.
1+
/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
22
/// Continuation is cancelled with error if current task is cancelled and cancellation handler is immediately invoked.
33
///
4-
/// This operation cooperatively checks for cancellation and reacting to it by cancelling the unsafe throwing continuation with an error
4+
/// This operation cooperatively checks for cancellation and reacting to it by cancelling the throwing continuation with an error
55
/// and the cancellation handler is always and immediately invoked after that.
66
/// For example, even if the operation is running code that never checks for cancellation,
77
/// a cancellation handler still runs and provides a chance to run some cleanup code.
88
///
99
/// - Parameters:
1010
/// - handler: A closure that is called after cancelling continuation.
1111
/// You must not resume the continuation in closure.
12-
/// - fn: A closure that takes an `UnsafeContinuation` parameter.
12+
/// - fn: A closure that takes a throwing continuation parameter.
1313
/// You must resume the continuation exactly once.
1414
///
15-
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
1615
/// - Returns: The value passed to the continuation.
16+
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
1717
///
1818
/// - Important: The continuation provided in cancellation handler is already resumed with cancellation error.
1919
/// Trying to resume the continuation here will cause runtime error/unexpected behavior.
20-
func withUnsafeThrowingContinuationCancellationHandler<T: Sendable>(
21-
handler: @Sendable (UnsafeContinuation<T, Error>) -> Void,
22-
_ fn: (UnsafeContinuation<T, Error>) -> Void
23-
) async throws -> T {
24-
typealias Continuation = UnsafeContinuation<T, Error>
25-
let wrapper = ContinuationWrapper<Continuation>()
20+
func withThrowingContinuationCancellationHandler<C: ThrowingContinuable>(
21+
handler: @Sendable (C) -> Void,
22+
_ fn: (C) -> Void
23+
) async throws -> C.Success where C.Failure == Error {
24+
let wrapper = ContinuationWrapper<C>()
2625
let value = try await withTaskCancellationHandler {
2726
guard let continuation = wrapper.value else { return }
2827
wrapper.cancel(withError: CancellationError())
2928
handler(continuation)
30-
} operation: { () -> T in
31-
let value = try await withUnsafeThrowingContinuation {
32-
(c: Continuation) in
33-
wrapper.value = c
34-
fn(c)
29+
} operation: { () -> C.Success in
30+
let value = try await C.with { continuation in
31+
wrapper.value = continuation
32+
fn(continuation)
3533
}
3634
return value
3735
}
@@ -85,10 +83,80 @@ protocol Continuable: Sendable {
8583
func resume(with result: Result<Success, Failure>)
8684
}
8785

88-
extension UnsafeContinuation: Continuable {}
89-
90-
extension UnsafeContinuation where E == Error {
86+
extension Continuable where Failure == Error {
9187
/// Cancel continuation by resuming with cancellation error.
9288
@inlinable
9389
func cancel() { self.resume(throwing: CancellationError()) }
9490
}
91+
92+
extension UnsafeContinuation: Continuable {}
93+
extension CheckedContinuation: Continuable {}
94+
95+
protocol ThrowingContinuable: Continuable {
96+
/// The type of error to resume the continuation with in case of failure.
97+
associatedtype Failure = Error
98+
/// Suspends the current task, then calls the given closure
99+
/// with a throwing continuation for the current task.
100+
///
101+
/// The continuation can be resumed exactly once,
102+
/// subsequent resumes have different behavior depending on type implemeting.
103+
///
104+
/// - Parameter fn: A closure that takes the throwing continuation parameter.
105+
/// You can resume the continuation exactly once.
106+
///
107+
/// - Returns: The value passed to the continuation by the closure.
108+
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
109+
@inlinable
110+
static func with(_ fn: (Self) -> Void) async throws -> Success
111+
}
112+
113+
extension UnsafeContinuation: ThrowingContinuable where E == Error {
114+
/// Suspends the current task, then calls the given closure
115+
/// with an unsafe throwing continuation for the current task.
116+
///
117+
/// The continuation must be resumed exactly once, subsequent resumes will cause runtime error.
118+
/// Use `CheckedContinuation` to capture relevant data in case of runtime errors.
119+
///
120+
/// - Parameter fn: A closure that takes an `UnsafeContinuation` parameter.
121+
/// You must resume the continuation exactly once.
122+
///
123+
/// - Returns: The value passed to the continuation by the closure.
124+
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
125+
@inlinable
126+
static func with(_ fn: (UnsafeContinuation<T, E>) -> Void) async throws -> T
127+
{
128+
return try await withUnsafeThrowingContinuation(fn)
129+
}
130+
}
131+
132+
extension CheckedContinuation: ThrowingContinuable where E == Error {
133+
/// Suspends the current task, then calls the given closure
134+
/// with a checked throwing continuation for the current task.
135+
///
136+
/// The continuation must be resumed exactly once, subsequent resumes will cause runtime error.
137+
/// `CheckedContinuation` logs messages proving additional info on these errors.
138+
/// Once all errors resolved, use `UnsafeContinuation` in release mode to benefit improved performance
139+
/// at the loss of additional runtime checks.
140+
///
141+
/// - Parameter fn: A closure that takes a `CheckedContinuation` parameter.
142+
/// You must resume the continuation exactly once.
143+
///
144+
/// - Returns: The value passed to the continuation by the closure.
145+
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
146+
@inlinable
147+
static func with(_ body: (CheckedContinuation<T, E>) -> Void) async throws
148+
-> T
149+
{
150+
return try await withCheckedThrowingContinuation(body)
151+
}
152+
}
153+
154+
#if DEBUG || ASYNCOBJECTS_USE_CHECKEDCONTINUATION
155+
/// The continuation type used in package in `DEBUG` mode
156+
/// or if `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag turned on.
157+
typealias GlobalContinuation<T, E: Error> = CheckedContinuation<T, E>
158+
#else
159+
/// The continuation type used in package in `RELEASE` mode
160+
///and in absence of `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag.
161+
typealias GlobalContinuation<T, E: Error> = UnsafeContinuation<T, E>
162+
#endif

Sources/AsyncObjects/TaskOperation.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
164164

165165
// MARK: AsyncObject Impl
166166
/// The suspended tasks continuation type.
167-
private typealias Continuation = UnsafeContinuation<Void, Error>
167+
private typealias Continuation = GlobalContinuation<Void, Error>
168168
/// The continuations stored with an associated key for all the suspended task that are waitig for opearation completion.
169169
private var continuations: [UUID: Continuation] = [:]
170170

@@ -205,7 +205,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
205205
public func wait() async {
206206
guard !isFinished else { return }
207207
let key = UUID()
208-
try? await withUnsafeThrowingContinuationCancellationHandler(
208+
try? await withThrowingContinuationCancellationHandler(
209209
handler: { [weak self] (continuation: Continuation) in
210210
Task { [weak self] in
211211
self?.removeContinuation(withKey: key)

Sources/AsyncObjects/TaskQueue.swift

Lines changed: 23 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public actor TaskQueue: AsyncObject {
147147

148148
let key = UUID()
149149
do {
150-
try await withOnceResumableThrowingContinuationCancellationHandler(
150+
try await withThrowingContinuationCancellationHandler(
151151
handler: { [weak self] continuation in
152152
Task { [weak self] in
153153
await self?.dequeueContinuation(withKey: key)
@@ -205,7 +205,7 @@ public actor TaskQueue: AsyncObject {
205205
guard barriered || !queue.isEmpty else { return await runTask() }
206206
let key = UUID()
207207
do {
208-
try await withOnceResumableThrowingContinuationCancellationHandler(
208+
try await withThrowingContinuationCancellationHandler(
209209
handler: { [weak self] continuation in
210210
Task { [weak self] in
211211
await self?.dequeueContinuation(withKey: key)
@@ -237,49 +237,6 @@ public actor TaskQueue: AsyncObject {
237237
}
238238
}
239239

240-
/// Suspends the current task, then calls the given closure with a safe throwing continuation for the current task.
241-
/// Continuation is cancelled with error if current task is cancelled and cancellation handler is immediately invoked.
242-
///
243-
/// This operation cooperatively checks for cancellation and reacting to it by cancelling the safe throwing continuation with an error
244-
/// and the cancellation handler is always and immediately invoked after that.
245-
/// For example, even if the operation is running code that never checks for cancellation,
246-
/// a cancellation handler still runs and provides a chance to run some cleanup code.
247-
///
248-
/// - Parameters:
249-
/// - handler: A closure that is called after cancelling continuation.
250-
/// Resuming the continuation in closure will not have any effect.
251-
/// - fn: A closure that takes an `SafeContinuation` parameter.
252-
/// Continuation can be resumed exactly once and subsequent resuming are ignored and has no effect..
253-
///
254-
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
255-
/// - Returns: The value passed to the continuation.
256-
///
257-
/// - Note: The continuation provided in cancellation handler is already resumed with cancellation error.
258-
/// Trying to resume the continuation here will have no effect.
259-
private func withOnceResumableThrowingContinuationCancellationHandler<
260-
T: Sendable
261-
>(
262-
handler: @Sendable (SafeContinuation<T, Error>) -> Void,
263-
_ fn: (SafeContinuation<T, Error>) -> Void
264-
) async throws -> T {
265-
typealias Continuation = SafeContinuation<T, Error>
266-
let wrapper = ContinuationWrapper<Continuation>()
267-
let value = try await withTaskCancellationHandler {
268-
guard let continuation = wrapper.value else { return }
269-
wrapper.cancel(withError: CancellationError())
270-
handler(continuation)
271-
} operation: { () -> T in
272-
let value = try await withUnsafeThrowingContinuation {
273-
(c: UnsafeContinuation<T, Error>) in
274-
let continuation = SafeContinuation(continuation: c)
275-
wrapper.value = continuation
276-
fn(continuation)
277-
}
278-
return value
279-
}
280-
return value
281-
}
282-
283240
/// A mechanism to interface between synchronous and asynchronous code,
284241
/// ignoring correctness violations.
285242
///
@@ -302,7 +259,7 @@ struct SafeContinuation<T: Sendable, E: Error>: Continuable {
302259
}
303260

304261
/// The underlying continuation used.
305-
private let wrappedValue: UnsafeContinuation<T, E>
262+
private let wrappedValue: GlobalContinuation<T, E>
306263
/// Keeps track whether continuation can be resumed,
307264
/// to make sure continuation only resumes once.
308265
private let resumable: Flag = {
@@ -318,9 +275,7 @@ struct SafeContinuation<T: Sendable, E: Error>: Continuable {
318275
/// don’t use it outside of this object.
319276
///
320277
/// - Returns: The newly created safe continuation.
321-
init(
322-
continuation: UnsafeContinuation<T, E>
323-
) {
278+
init(continuation: GlobalContinuation<T, E>) {
324279
self.wrappedValue = continuation
325280
}
326281

@@ -381,3 +336,22 @@ struct SafeContinuation<T: Sendable, E: Error>: Continuable {
381336
resumable.off()
382337
}
383338
}
339+
340+
extension SafeContinuation: ThrowingContinuable where E == Error {
341+
/// Suspends the current task, then calls the given closure
342+
/// with a safe throwing continuation for the current task.
343+
///
344+
/// The continuation can be resumed exactly once, subsequent resumes are ignored.
345+
///
346+
/// - Parameter fn: A closure that takes the throwing continuation parameter.
347+
/// You can resume the continuation exactly once.
348+
///
349+
/// - Returns: The value passed to the continuation by the closure.
350+
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
351+
@inlinable
352+
static func with(_ fn: (SafeContinuation<T, E>) -> Void) async throws -> T {
353+
return try await GlobalContinuation<T, E>.with { continuation in
354+
fn(.init(continuation: continuation))
355+
}
356+
}
357+
}

0 commit comments

Comments
 (0)