Skip to content

Commit a5b6ce0

Browse files
authored
chore: kickoff release
2 parents dc32a9d + f7488e3 commit a5b6ce0

File tree

3 files changed

+57
-15
lines changed

3 files changed

+57
-15
lines changed

Amplify/Core/Support/TaskQueue.swift

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,25 @@
88
import Foundation
99

1010
/// A helper for executing asynchronous work serially.
11-
public actor TaskQueue<Success> {
12-
private var previousTask: Task<Success, Error>?
11+
public class TaskQueue<Success> {
12+
typealias Block = @Sendable () async -> Void
13+
private var streamContinuation: AsyncStream<Block>.Continuation!
1314

14-
public init() {}
15+
public init() {
16+
let stream = AsyncStream<Block>.init { continuation in
17+
streamContinuation = continuation
18+
}
19+
20+
Task {
21+
for await block in stream {
22+
_ = await block()
23+
}
24+
}
25+
}
26+
27+
deinit {
28+
streamContinuation.finish()
29+
}
1530

1631
/// Serializes asynchronous requests made from an async context
1732
///
@@ -25,17 +40,31 @@ public actor TaskQueue<Success> {
2540
/// TaskQueue serializes this work so that `doAsync1` is performed before `doAsync2`,
2641
/// which is performed before `doAsync3`.
2742
public func sync(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
28-
let currentTask: Task<Success, Error> = Task { [previousTask] in
29-
_ = await previousTask?.result
30-
return try await block()
43+
try await withCheckedThrowingContinuation { continuation in
44+
streamContinuation.yield {
45+
do {
46+
let value = try await block()
47+
continuation.resume(returning: value)
48+
} catch {
49+
continuation.resume(throwing: error)
50+
}
51+
}
3152
}
32-
previousTask = currentTask
33-
return try await currentTask.value
3453
}
3554

36-
public nonisolated func async(block: @Sendable @escaping () async throws -> Success) rethrows {
37-
Task {
38-
try await sync(block: block)
55+
public func async(block: @Sendable @escaping () async throws -> Success) {
56+
streamContinuation.yield {
57+
do {
58+
_ = try await block()
59+
} catch {
60+
Self.log.warn("Failed to handle async task in TaskQueue<\(Success.self)> with error: \(error)")
61+
}
3962
}
4063
}
4164
}
65+
66+
extension TaskQueue {
67+
public static var log: Logger {
68+
Amplify.Logging.logger(forNamespace: String(describing: self))
69+
}
70+
}

AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,7 @@ public class AWSGraphQLSubscriptionTaskRunner<R: Decodable>: InternalTaskRunner,
4646

4747
public func cancel() {
4848
self.send(GraphQLSubscriptionEvent<R>.connection(.disconnected))
49-
Task { [weak self] in
50-
guard let self else {
51-
return
52-
}
49+
Task {
5350
guard let appSyncClient = self.appSyncClient else {
5451
return
5552
}

AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,20 @@ final class AmplifyTaskQueueTests: XCTestCase {
6666
await fulfillment(of: [expectation1, expectation2, expectation3], enforceOrder: true)
6767
}
6868

69+
func testAsync() async throws {
70+
let taskCount = 1_000
71+
let expectations: [XCTestExpectation] = (0..<taskCount).map {
72+
expectation(description: "Expected execution of a task number \($0)")
73+
}
74+
75+
let taskQueue = TaskQueue<Void>()
76+
77+
for i in 0..<taskCount {
78+
taskQueue.async {
79+
expectations[i].fulfill()
80+
}
81+
}
82+
83+
await fulfillment(of: expectations, enforceOrder: true)
84+
}
6985
}

0 commit comments

Comments
 (0)