Skip to content

Commit 70209ed

Browse files
authored
fix(DataStore): Reconcile locally sourced mutations while subscriptions are disabled (#1712)
* fix(DataStore): Reconcile locally sourced mutations while subscriptions are disabled * add integration test
1 parent e09a2ed commit 70209ed

22 files changed

+246
-37
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOperation.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ final class InitialSyncOperation: AsynchronousOperation {
183183
let items = syncQueryResult.items
184184
recordsReceived += UInt(items.count)
185185

186-
reconciliationQueue.offer(items, modelSchema: modelSchema)
186+
reconciliationQueue.offer(items, modelName: modelSchema.name)
187187
for item in items {
188188
initialSyncOperationTopic.send(.enqueued(item, modelName: modelSchema.name))
189189
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ extension OutgoingMutationQueue {
1515
enum Action {
1616
// Startup/config actions
1717
case initialized
18-
case receivedStart(APICategoryGraphQLBehavior, MutationEventPublisher)
18+
case receivedStart(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
1919
case receivedSubscription
2020

2121
// Event loop

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Resolver.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ extension OutgoingMutationQueue {
1818
case (.notInitialized, .initialized):
1919
return .stopped
2020

21-
case (.stopped, .receivedStart(let api, let mutationEventPublisher)):
22-
return .starting(api, mutationEventPublisher)
21+
case (.stopped, .receivedStart(let api, let mutationEventPublisher, let reconciliationQueue)):
22+
return .starting(api, mutationEventPublisher, reconciliationQueue)
2323

2424
case (.starting, .receivedSubscription):
2525
return .requestingEvent

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ extension OutgoingMutationQueue {
1616
// Startup/config states
1717
case notInitialized
1818
case stopped
19-
case starting(APICategoryGraphQLBehavior, MutationEventPublisher)
19+
case starting(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
2020

2121
// Event loop
2222
case requestingEvent

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import AWSPluginsCore
1515
protocol OutgoingMutationQueueBehavior: AnyObject {
1616
func stopSyncingToCloud(_ completion: @escaping BasicClosure)
1717
func startSyncingToCloud(api: APICategoryGraphQLBehavior,
18-
mutationEventPublisher: MutationEventPublisher)
18+
mutationEventPublisher: MutationEventPublisher,
19+
reconciliationQueue: IncomingEventReconciliationQueue?)
1920
var publisher: AnyPublisher<MutationEvent, Never> { get }
2021
}
2122

@@ -34,6 +35,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
3435
)
3536

3637
private weak var api: APICategoryGraphQLBehavior?
38+
private weak var reconciliationQueue: IncomingEventReconciliationQueue?
3739

3840
private var subscription: Subscription?
3941
private let dataStoreConfiguration: DataStoreConfiguration
@@ -86,9 +88,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
8688
// MARK: - Public API
8789

8890
func startSyncingToCloud(api: APICategoryGraphQLBehavior,
89-
mutationEventPublisher: MutationEventPublisher) {
91+
mutationEventPublisher: MutationEventPublisher,
92+
reconciliationQueue: IncomingEventReconciliationQueue?) {
9093
log.verbose(#function)
91-
stateMachine.notify(action: .receivedStart(api, mutationEventPublisher))
94+
stateMachine.notify(action: .receivedStart(api, mutationEventPublisher, reconciliationQueue))
9295
}
9396

9497
func stopSyncingToCloud(_ completion: @escaping BasicClosure) {
@@ -104,8 +107,8 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
104107

105108
switch newState {
106109

107-
case .starting(let api, let mutationEventPublisher):
108-
doStart(api: api, mutationEventPublisher: mutationEventPublisher)
110+
case .starting(let api, let mutationEventPublisher, let reconciliationQueue):
111+
doStart(api: api, mutationEventPublisher: mutationEventPublisher, reconciliationQueue: reconciliationQueue)
109112

110113
case .requestingEvent:
111114
requestEvent()
@@ -131,9 +134,11 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
131134
/// the publisher. After subscribing to the publisher, return actions:
132135
/// - receivedSubscription
133136
private func doStart(api: APICategoryGraphQLBehavior,
134-
mutationEventPublisher: MutationEventPublisher) {
137+
mutationEventPublisher: MutationEventPublisher,
138+
reconciliationQueue: IncomingEventReconciliationQueue?) {
135139
log.verbose(#function)
136140
self.api = api
141+
self.reconciliationQueue = reconciliationQueue
137142

138143
queryMutationEventsFromStorage {
139144
self.operationQueue.isSuspended = false
@@ -241,6 +246,18 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
241246
private func processSuccessEvent(_ mutationEvent: MutationEvent,
242247
mutationSync: MutationSync<AnyModel>?) {
243248
if let mutationSync = mutationSync {
249+
guard let reconciliationQueue = reconciliationQueue else {
250+
let dataStoreError = DataStoreError.configuration(
251+
"reconciliationQueue is unexpectedly nil",
252+
"""
253+
The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
254+
\(AmplifyErrorMessages.reportBugToAWS())
255+
"""
256+
)
257+
stateMachine.notify(action: .errored(dataStoreError))
258+
return
259+
}
260+
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
244261
MutationEvent.reconcilePendingMutationEventsVersion(
245262
sent: mutationEvent,
246263
received: mutationSync,

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Action.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ extension RemoteSyncEngine {
2121
case clearedStateOutgoingMutations(APICategoryGraphQLBehavior, StorageEngineAdapter)
2222
case initializedSubscriptions
2323
case performedInitialSync
24-
case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher)
24+
case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
2525
case activatedMutationQueue
2626
case notifiedSyncStarted
2727
case cleanedUp(AmplifyError)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ extension RemoteSyncEngine {
4242
remoteSyncTopicPublisher.send(.subscriptionsActivated)
4343
if let api = self.api {
4444
stateMachine.notify(action: .activatedCloudSubscriptions(api,
45-
mutationEventPublisher))
45+
mutationEventPublisher,
46+
reconciliationQueue))
4647
}
4748
case .paused:
4849
remoteSyncTopicPublisher.send(.subscriptionsPaused)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Resolver.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ extension RemoteSyncEngine {
3737
return .cleaningUp(error)
3838

3939
case (.activatingCloudSubscriptions, .activatedCloudSubscriptions(let api,
40-
let mutationEventPublisher)):
41-
return .activatingMutationQueue(api, mutationEventPublisher)
40+
let mutationEventPublisher,
41+
let reconciliationQueue)):
42+
return .activatingMutationQueue(api, mutationEventPublisher, reconciliationQueue)
4243
case (.activatingCloudSubscriptions, .errored(let error)):
4344
return .cleaningUp(error)
4445

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+State.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ extension RemoteSyncEngine {
2020
case initializingSubscriptions(APICategoryGraphQLBehavior, StorageEngineAdapter)
2121
case performingInitialSync
2222
case activatingCloudSubscriptions
23-
case activatingMutationQueue(APICategoryGraphQLBehavior, MutationEventPublisher)
23+
case activatingMutationQueue(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
2424
case notifyingSyncStarted
2525

2626
case syncEngineActive

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,10 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
173173
performInitialSync()
174174
case .activatingCloudSubscriptions:
175175
activateCloudSubscriptions()
176-
case .activatingMutationQueue(let api, let mutationEventPublisher):
176+
case .activatingMutationQueue(let api, let mutationEventPublisher, let reconciliationQueue):
177177
startMutationQueue(api: api,
178-
mutationEventPublisher: mutationEventPublisher)
178+
mutationEventPublisher: mutationEventPublisher,
179+
reconciliationQueue: reconciliationQueue)
179180
case .notifyingSyncStarted:
180181
notifySyncStarted()
181182

@@ -359,10 +360,12 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
359360
}
360361

361362
private func startMutationQueue(api: APICategoryGraphQLBehavior,
362-
mutationEventPublisher: MutationEventPublisher) {
363+
mutationEventPublisher: MutationEventPublisher,
364+
reconciliationQueue: IncomingEventReconciliationQueue?) {
363365
log.debug(#function)
364366
outgoingMutationQueue.startSyncingToCloud(api: api,
365-
mutationEventPublisher: mutationEventPublisher)
367+
mutationEventPublisher: mutationEventPublisher,
368+
reconciliationQueue: reconciliationQueue)
366369

367370
remoteSyncTopicPublisher.send(.mutationQueueStarted)
368371
stateMachine.notify(action: .activatedMutationQueue)

0 commit comments

Comments
 (0)