Skip to content

Commit e51aee8

Browse files
5dlawmicha
andauthored
fix(datastore): change OutgoingMutationQueue use TaskQueue for state transitions (#3720)
* fix(datastore): change OutgoingMutationQueue use TaskQueue for state transition * Update AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift Co-authored-by: Michael Law <[email protected]> --------- Co-authored-by: Michael Law <[email protected]>
1 parent 63b4c1b commit e51aee8

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
2727
private let operationQueue: OperationQueue
2828

2929
/// A DispatchQueue for synchronizing state on the mutation queue
30-
private let mutationDispatchQueue = DispatchQueue(
31-
label: "com.amazonaws.OutgoingMutationQueue",
32-
target: DispatchQueue.global()
33-
)
30+
private let mutationDispatchQueue = TaskQueue<Void>()
3431

3532
private weak var api: APICategoryGraphQLBehaviorExtended?
3633
private weak var reconciliationQueue: IncomingEventReconciliationQueue?
@@ -55,7 +52,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
5552

5653
let operationQueue = OperationQueue()
5754
operationQueue.name = "com.amazonaws.OutgoingMutationOperationQueue"
58-
operationQueue.underlyingQueue = mutationDispatchQueue
55+
operationQueue.qualityOfService = .default
5956
operationQueue.maxConcurrentOperationCount = 1
6057
operationQueue.isSuspended = true
6158

@@ -139,6 +136,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
139136

140137
queryMutationEventsFromStorage { [weak self] in
141138
guard let self = self else { return }
139+
guard case .starting = self.stateMachine.state else {
140+
self.log.debug("Unexpected state transition while performing `doStart()` during `.starting` state. Current state: \(self.stateMachine.state).")
141+
return
142+
}
142143

143144
self.operationQueue.isSuspended = false
144145
// State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)`

0 commit comments

Comments
 (0)