@@ -61,7 +61,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
6161 self . taskStreamContinuation = taskStreamContinuation
6262
6363 func beginExecuting(
64- _ operation: sending @escaping ( isolated ActorType) async -> Void ,
64+ _ operation: sending @escaping ( isolated ActorType) async -> Sendable ,
6565 in context: isolated ActorType
6666 ) {
6767 // In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
@@ -80,6 +80,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
8080 actorTask. task,
8181 in: actorTask. executionContext
8282 )
83+ await actorTask. sempahore. signal ( )
8384 }
8485 }
8586 }
@@ -101,65 +102,246 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
101102 weakExecutionContext = actor
102103 }
103104
104- /// Schedules an asynchronous task for execution and immediately returns.
105- /// The scheduled task will not execute until all prior tasks have completed or suspended.
106- /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
107- public func enqueue( _ task: @escaping @Sendable ( isolated ActorType) async -> Void ) {
108- taskStreamContinuation. yield ( ActorTask ( executionContext: executionContext, task: task) )
109- }
110-
111- /// Schedules an asynchronous task and returns after the task is complete.
112- /// The scheduled task will not execute until all prior tasks have completed or suspended.
113- /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
114- /// - Returns: The value returned from the enqueued task.
115- public func enqueueAndWait< T: Sendable > ( _ task: @escaping @Sendable ( isolated ActorType) async -> T ) async -> T {
116- let executionContext = self . executionContext // Capture/retain the executionContext before suspending.
117- return await withUnsafeContinuation { continuation in
118- taskStreamContinuation. yield ( ActorTask ( executionContext: executionContext) { executionContext in
119- continuation. resume ( returning: await task ( executionContext) )
120- } )
121- }
122- }
123-
124- /// Schedules an asynchronous throwing task and returns after the task is complete.
125- /// The scheduled task will not execute until all prior tasks have completed or suspended.
126- /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
127- /// - Returns: The value returned from the enqueued task.
128- public func enqueueAndWait< T: Sendable > ( _ task: @escaping @Sendable ( isolated ActorType) async throws -> T ) async throws -> T {
129- let executionContext = self . executionContext // Capture/retain the executionContext before suspending.
130- return try await withUnsafeThrowingContinuation { continuation in
131- taskStreamContinuation. yield ( ActorTask ( executionContext: executionContext) { executionContext in
132- do {
133- continuation. resume ( returning: try await task ( executionContext) )
134- } catch {
135- continuation. resume ( throwing: error)
136- }
137- } )
138- }
139- }
140-
141- // MARK: Private
105+ // MARK: Fileprivate
142106
143- private let taskStreamContinuation : AsyncStream < ActorTask > . Continuation
107+ fileprivate let taskStreamContinuation : AsyncStream < ActorTask > . Continuation
144108
145109 /// The actor on whose isolated context our tasks run, force-unwrapped.
146110 /// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment.
147- private var executionContext : ActorType {
111+ fileprivate var executionContext : ActorType {
148112 // Crashing here means that this queue is being sent tasks either before an execution context has been set, or
149113 // after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted
150114 // actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor.
151115 weakExecutionContext!
152116 }
117+
118+ fileprivate struct ActorTask : Sendable {
119+ init ( executionContext: ActorType , task: @escaping @Sendable ( isolated ActorType) async -> Void ) {
120+ self . executionContext = executionContext
121+ self . task = task
122+ }
123+
124+ let executionContext : ActorType
125+ let sempahore = Semaphore ( )
126+ let task : @Sendable ( isolated ActorType) async -> Void
127+ }
128+
129+ // MARK: Private
130+
153131 /// The actor on whose isolated context our tasks run.
154132 /// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue.
155133 ///
156134 /// We will assume this execution context always exists for the lifecycle of the queue because:
157135 /// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`.
158136 /// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method.
159137 private weak var weakExecutionContext : ActorType ?
138+ }
160139
161- private struct ActorTask {
162- let executionContext : ActorType
163- let task : @Sendable ( isolated ActorType) async -> Void
140+ extension Task {
141+ /// Runs the given nonthrowing operation asynchronously
142+ /// as part of a new top-level task on behalf of the current actor.
143+ /// The operation will not execute until all prior tasks have
144+ /// completed or suspended.
145+ ///
146+ /// Use this function when creating asynchronous work
147+ /// that operates on behalf of the synchronous function that calls it.
148+ /// Like `Task.detached(priority:operation:)`,
149+ /// this function creates a separate, top-level task.
150+ /// Unlike `Task.detached(priority:operation:)`,
151+ /// the task created by `Task.init(priority:operation:)`
152+ /// inherits the priority and actor context of the caller,
153+ /// so the operation is treated more like an asynchronous extension
154+ /// to the synchronous operation.
155+ ///
156+ /// You need to keep a reference to the task
157+ /// if you want to cancel it by calling the `Task.cancel()` method.
158+ /// Discarding your reference to a detached task
159+ /// doesn't implicitly cancel that task,
160+ /// it only makes it impossible for you to explicitly cancel the task.
161+ ///
162+ /// - Parameters:
163+ /// - priority: The priority of the task.
164+ /// Pass `nil` to use the priority from `Task.currentPriority`.
165+ /// - actorQueue: The queue on which to enqueue the task.
166+ /// - operation: The operation to perform.
167+ @discardableResult
168+ public init < ActorType: Actor > (
169+ priority: TaskPriority ? = nil ,
170+ enqueuedOn actorQueue: ActorQueue < ActorType > ,
171+ operation: @Sendable @escaping ( isolated ActorType) async -> Success
172+ ) where Failure == Never {
173+ let delivery = Delivery < Success , Failure > ( )
174+ let task = ActorQueue< ActorType> . ActorTask(
175+ executionContext: actorQueue. executionContext,
176+ task: { executionContext in
177+ await delivery. sendValue ( operation ( executionContext) )
178+ }
179+ )
180+ actorQueue. taskStreamContinuation. yield ( task)
181+ self . init ( priority: priority) {
182+ await task. sempahore. wait ( )
183+ return await delivery. getValue ( )
184+ }
185+ }
186+
187+ /// Runs the given throwing operation asynchronously
188+ /// as part of a new top-level task on behalf of the current actor.
189+ /// The operation will not execute until all prior tasks have
190+ /// completed or suspended.
191+ ///
192+ /// Use this function when creating asynchronous work
193+ /// that operates on behalf of the synchronous function that calls it.
194+ /// Like `Task.detached(priority:operation:)`,
195+ /// this function creates a separate, top-level task.
196+ /// Unlike `Task.detached(priority:operation:)`,
197+ /// the task created by `Task.init(priority:operation:)`
198+ /// inherits the priority and actor context of the caller,
199+ /// so the operation is treated more like an asynchronous extension
200+ /// to the synchronous operation.
201+ ///
202+ /// You need to keep a reference to the task
203+ /// if you want to cancel it by calling the `Task.cancel()` method.
204+ /// Discarding your reference to a detached task
205+ /// doesn't implicitly cancel that task,
206+ /// it only makes it impossible for you to explicitly cancel the task.
207+ ///
208+ /// - Parameters:
209+ /// - priority: The priority of the task.
210+ /// Pass `nil` to use the priority from `Task.currentPriority`.
211+ /// - actorQueue: The queue on which to enqueue the task.
212+ /// - operation: The operation to perform.
213+ @discardableResult
214+ public init < ActorType: Actor > (
215+ priority: TaskPriority ? = nil ,
216+ enqueuedOn actorQueue: ActorQueue < ActorType > ,
217+ operation: @escaping @Sendable ( isolated ActorType) async throws -> Success
218+ ) where Failure == any Error {
219+ let delivery = Delivery < Success , Failure > ( )
220+ let task = ActorQueue< ActorType> . ActorTask(
221+ executionContext: actorQueue. executionContext,
222+ task: { executionContext in
223+ do {
224+ try await delivery. sendValue ( operation ( executionContext) )
225+ } catch {
226+ await delivery. sendFailure ( error)
227+ }
228+ }
229+ )
230+
231+ actorQueue. taskStreamContinuation. yield ( task)
232+ self . init ( priority: priority) {
233+ await task. sempahore. wait ( )
234+ return try await delivery. getValue ( )
235+ }
236+ }
237+
238+ /// Runs the given nonthrowing operation asynchronously
239+ /// as part of a new top-level task on behalf of the current actor.
240+ /// The operation will not execute until all prior tasks have
241+ /// completed or suspended.
242+ ///
243+ /// Use this function when creating asynchronous work
244+ /// that operates on behalf of the synchronous function that calls it.
245+ /// Like `Task.detached(priority:operation:)`,
246+ /// this function creates a separate, top-level task.
247+ /// Unlike `Task.detached(priority:operation:)`,
248+ /// the task created by `Task.init(priority:operation:)`
249+ /// inherits the priority and actor context of the caller,
250+ /// so the operation is treated more like an asynchronous extension
251+ /// to the synchronous operation.
252+ ///
253+ /// You need to keep a reference to the task
254+ /// if you want to cancel it by calling the `Task.cancel()` method.
255+ /// Discarding your reference to a detached task
256+ /// doesn't implicitly cancel that task,
257+ /// it only makes it impossible for you to explicitly cancel the task.
258+ ///
259+ /// - Parameters:
260+ /// - priority: The priority of the task.
261+ /// Pass `nil` to use the priority from `Task.currentPriority`.
262+ /// - actorQueue: The queue on which to enqueue the task.
263+ /// - operation: The operation to perform.
264+ @discardableResult
265+ public init (
266+ priority: TaskPriority ? = nil ,
267+ enqueuedOn actorQueue: ActorQueue < MainActor > ,
268+ operation: @MainActor @escaping ( ) async -> Success
269+ ) where Failure == Never {
270+ let delivery = Delivery < Success , Failure > ( )
271+ let task = ActorQueue< MainActor> . ActorTask(
272+ executionContext: actorQueue. executionContext,
273+ task: { executionContext in
274+ await delivery. sendValue ( operation ( ) )
275+ }
276+ )
277+ actorQueue. taskStreamContinuation. yield ( task)
278+ self . init ( priority: priority) {
279+ await task. sempahore. wait ( )
280+ return await delivery. getValue ( )
281+ }
282+ }
283+
284+ /// Runs the given throwing operation asynchronously
285+ /// as part of a new top-level task on behalf of the current actor.
286+ /// The operation will not execute until all prior tasks have
287+ /// completed or suspended.
288+ ///
289+ /// Use this function when creating asynchronous work
290+ /// that operates on behalf of the synchronous function that calls it.
291+ /// Like `Task.detached(priority:operation:)`,
292+ /// this function creates a separate, top-level task.
293+ /// Unlike `Task.detached(priority:operation:)`,
294+ /// the task created by `Task.init(priority:operation:)`
295+ /// inherits the priority and actor context of the caller,
296+ /// so the operation is treated more like an asynchronous extension
297+ /// to the synchronous operation.
298+ ///
299+ /// You need to keep a reference to the task
300+ /// if you want to cancel it by calling the `Task.cancel()` method.
301+ /// Discarding your reference to a detached task
302+ /// doesn't implicitly cancel that task,
303+ /// it only makes it impossible for you to explicitly cancel the task.
304+ ///
305+ /// - Parameters:
306+ /// - priority: The priority of the task.
307+ /// Pass `nil` to use the priority from `Task.currentPriority`.
308+ /// - actorQueue: The queue on which to enqueue the task.
309+ /// - operation: The operation to perform.
310+ @discardableResult
311+ public init (
312+ priority: TaskPriority ? = nil ,
313+ enqueuedOn actorQueue: ActorQueue < MainActor > ,
314+ operation: @escaping @MainActor ( ) async throws -> Success
315+ ) where Failure == any Error {
316+ let delivery = Delivery < Success , Failure > ( )
317+ let task = ActorQueue< MainActor> . ActorTask(
318+ executionContext: actorQueue. executionContext,
319+ task: { executionContext in
320+ do {
321+ try await delivery. sendValue ( operation ( ) )
322+ } catch {
323+ await delivery. sendFailure ( error)
324+ }
325+ }
326+ )
327+
328+ actorQueue. taskStreamContinuation. yield ( task)
329+ self . init ( priority: priority) {
330+ await task. sempahore. wait ( )
331+ return try await delivery. getValue ( )
332+ }
164333 }
165334}
335+
336+ extension MainActor {
337+ /// A global instance of an `ActorQueue<MainActor>`.
338+ public static var queue : ActorQueue < MainActor > {
339+ mainActorQueue
340+ }
341+ }
342+
343+ private let mainActorQueue = {
344+ let queue = ActorQueue < MainActor > ( )
345+ queue. adoptExecutionContext ( of: MainActor . shared)
346+ return queue
347+ } ( )
0 commit comments