Skip to content

Commit 71782aa

Browse files
authored
fix(datastore): ModelSyncedEvent dispatch consistency (#1823)
* fix(DataStore): ModelSyncedEvent dispatch consistency * address PR comments
1 parent 40008b2 commit 71782aa

14 files changed

+323
-83
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOperation.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ final class InitialSyncOperation: AsynchronousOperation {
193193
self.query(lastSyncTime: lastSyncTime, nextToken: nextToken)
194194
}
195195
} else {
196-
initialSyncOperationTopic.send(.finished(modelName: modelSchema.name))
197196
updateModelSyncMetadata(lastSyncTime: syncQueryResult.startedAt)
198197
}
199198
}
@@ -233,8 +232,10 @@ final class InitialSyncOperation: AsynchronousOperation {
233232
private func finish(result: AWSInitialSyncOrchestrator.SyncOperationResult) {
234233
switch result {
235234
case .failure(let error):
235+
initialSyncOperationTopic.send(.finished(modelName: modelSchema.name, error: error))
236236
initialSyncOperationTopic.send(completion: .failure(error))
237237
case .success:
238+
initialSyncOperationTopic.send(.finished(modelName: modelSchema.name))
238239
initialSyncOperationTopic.send(completion: .finished)
239240
}
240241
super.finish()

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOperationEvent.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ enum InitialSyncOperationEvent {
1919

2020
/// Published when the sync operation has completed and all remote models have been enqueued for reconciliation.
2121
/// Used by `ModelSyncedEventEmitter` to determine when to send `ModelSyncedEvent`
22-
case finished(modelName: ModelName)
22+
case finished(modelName: ModelName, error: DataStoreError? = nil)
2323
}
2424

2525
enum SyncType {

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOrchestrator.swift

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,9 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
115115
initialSyncOperationSinks[modelSchema.name] = initialSyncForModel
116116
.publisher
117117
.receive(on: concurrencyQueue)
118-
.sink(receiveCompletion: { result in
119-
if case .failure(let dataStoreError) = result {
120-
let syncError = DataStoreError.sync(
121-
"An error occurred syncing \(modelSchema.name)",
122-
"",
123-
dataStoreError)
124-
self.syncErrors.append(syncError)
125-
126-
if self.isUnauthorizedError(syncError) {
127-
self.initialSyncOrchestratorTopic.send(.finished(modelName: modelSchema.name))
128-
}
129-
}
130-
self.initialSyncOperationSinks.removeValue(forKey: modelSchema.name)
131-
self.onReceiveCompletion()
132-
}, receiveValue: onReceiveValue(_:))
118+
.sink(receiveCompletion: { result in self.onReceiveCompletion(modelSchema: modelSchema,
119+
result: result) },
120+
receiveValue: onReceiveValue(_:))
133121

134122
syncOperationQueue.addOperation(initialSyncForModel)
135123
}
@@ -138,7 +126,17 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
138126
initialSyncOrchestratorTopic.send(value)
139127
}
140128

141-
private func onReceiveCompletion() {
129+
private func onReceiveCompletion(modelSchema: ModelSchema, result: Subscribers.Completion<DataStoreError>) {
130+
if case .failure(let dataStoreError) = result {
131+
let syncError = DataStoreError.sync(
132+
"An error occurred syncing \(modelSchema.name)",
133+
"",
134+
dataStoreError)
135+
self.syncErrors.append(syncError)
136+
}
137+
138+
initialSyncOperationSinks.removeValue(forKey: modelSchema.name)
139+
142140
guard initialSyncOperationSinks.isEmpty else {
143141
return
144142
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/ModelSyncedEventEmitter.swift

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import Foundation
1212

1313
enum IncomingModelSyncedEmitterEvent {
1414
case mutationEventApplied(MutationEvent)
15-
case mutationEventDropped(modelName: String)
15+
case mutationEventDropped(modelName: String, error: DataStoreError? = nil)
1616
case modelSyncedEvent(ModelSyncedEvent)
1717
}
1818

@@ -90,7 +90,7 @@ final class ModelSyncedEventEmitter {
9090
return modelSchema.name == modelName
9191
case .enqueued(_, let modelName):
9292
return modelSchema.name == modelName
93-
case .finished(let modelName):
93+
case .finished(let modelName, _):
9494
return modelSchema.name == modelName
9595
}
9696
}
@@ -101,7 +101,7 @@ final class ModelSyncedEventEmitter {
101101
switch value {
102102
case .mutationEventApplied(let event):
103103
return modelSchema.name == event.modelName
104-
case .mutationEventDropped(let modelName):
104+
case .mutationEventDropped(let modelName, _):
105105
return modelSchema.name == modelName
106106
case .initialized, .started, .paused:
107107
return false
@@ -128,8 +128,8 @@ final class ModelSyncedEventEmitter {
128128
switch value {
129129
case .mutationEventApplied(let event):
130130
modelSyncedEventTopic.send(.mutationEventApplied(event))
131-
case .mutationEventDropped(let modelName):
132-
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName))
131+
case .mutationEventDropped(let modelName, let error):
132+
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName, error: error))
133133
case .initialized, .started, .paused:
134134
return
135135
}
@@ -155,11 +155,9 @@ final class ModelSyncedEventEmitter {
155155
if shouldSendModelSyncedEvent {
156156
sendModelSyncedEvent()
157157
}
158-
case .mutationEventDropped(let modelName):
158+
case .mutationEventDropped(let modelName, let error):
159159
reconciledReceived += 1
160-
161-
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName))
162-
160+
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName, error: error))
163161
if shouldSendModelSyncedEvent {
164162
sendModelSyncedEvent()
165163
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/SyncEventEmitter.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import Foundation
1212

1313
enum IncomingSyncEventEmitterEvent {
1414
case mutationEventApplied(MutationEvent)
15-
case mutationEventDropped(modelName: String)
15+
case mutationEventDropped(modelName: String, error: DataStoreError? = nil)
1616
case modelSyncedEvent(ModelSyncedEvent)
1717
case syncQueriesReadyEvent
1818
}
@@ -71,8 +71,8 @@ final class SyncEventEmitter {
7171
switch value {
7272
case .mutationEventApplied(let mutationEvent):
7373
syncEventEmitterTopic.send(.mutationEventApplied(mutationEvent))
74-
case .mutationEventDropped(let modelName):
75-
syncEventEmitterTopic.send(.mutationEventDropped(modelName: modelName))
74+
case .mutationEventDropped(let modelName, let error):
75+
syncEventEmitterTopic.send(.mutationEventDropped(modelName: modelName, error: error))
7676
case .modelSyncedEvent(let modelSyncedEvent):
7777
modelSyncedReceived += 1
7878
syncEventEmitterTopic.send(.modelSyncedEvent(modelSyncedEvent))

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
121121
switch receiveValue {
122122
case .mutationEvent(let event):
123123
eventReconciliationQueueTopic.send(.mutationEventApplied(event))
124-
case .mutationEventDropped(let modelName):
125-
eventReconciliationQueueTopic.send(.mutationEventDropped(modelName: modelName))
124+
case .mutationEventDropped(let modelName, let error):
125+
eventReconciliationQueueTopic.send(.mutationEventDropped(modelName: modelName, error: error))
126126
case .connected(modelName: let modelName):
127127
connectionStatusSerialQueue.async {
128128
self.reconciliationQueueConnectionStatus[modelName] = true

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/IncomingEventReconciliationQueue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ enum IncomingEventReconciliationQueueEvent {
1414
case started
1515
case paused
1616
case mutationEventApplied(MutationEvent)
17-
case mutationEventDropped(modelName: String)
17+
case mutationEventDropped(modelName: String, error: DataStoreError? = nil)
1818
}
1919

2020
/// A queue that reconciles all incoming events for a model: responses from locally-sourced mutations, and subscription

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
164164
return
165165
}
166166
switch value {
167-
case .mutationEventDropped(let modelName):
168-
self.modelReconciliationQueueSubject.send(.mutationEventDropped(modelName: modelName))
167+
case .mutationEventDropped(let modelName, let error):
168+
self.modelReconciliationQueueSubject.send(.mutationEventDropped(modelName: modelName, error: error))
169169
case .mutationEvent(let event):
170170
self.modelReconciliationQueueSubject.send(.mutationEvent(event))
171171
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ModelReconciliationQueue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ enum ModelReconciliationQueueEvent {
2020
case connected(modelName: String)
2121
case disconnected(modelName: String, reason: ModelConnectionDisconnectedReason)
2222
case mutationEvent(MutationEvent)
23-
case mutationEventDropped(modelName: String)
23+
case mutationEventDropped(modelName: String, error: DataStoreError? = nil)
2424
}
2525

2626
@available(iOS 13.0, *)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
112112
}
113113

114114
guard let storageAdapter = storageAdapter else {
115-
stateMachine.notify(action: .errored(DataStoreError.nilStorageAdapter()))
115+
let error = DataStoreError.nilStorageAdapter()
116+
notifyDropped(count: remoteModels.count, error: error)
117+
stateMachine.notify(action: .errored(error))
116118
return
117119
}
118120

@@ -168,7 +170,9 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
168170
return
169171
}
170172
guard let storageAdapter = self.storageAdapter else {
171-
result = .failure(DataStoreError.nilStorageAdapter())
173+
let error = DataStoreError.nilStorageAdapter()
174+
self.notifyDropped(count: modelIds.count, error: error)
175+
result = .failure(error)
172176
return
173177
}
174178

@@ -181,6 +185,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
181185
storageAdapter: storageAdapter) { queryResult in
182186
switch queryResult {
183187
case .failure(let dataStoreError):
188+
self.notifyDropped(count: modelIds.count, error: dataStoreError)
184189
result = .failure(dataStoreError)
185190
case .success(let mutationEvents):
186191
result = .success(mutationEvents)
@@ -190,17 +195,13 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
190195
}
191196

192197
func reconcile(_ remoteModels: [RemoteModel], pendingMutations: [MutationEvent]) -> [RemoteModel] {
193-
guard let remoteModel = remoteModels.first else {
198+
guard !remoteModels.isEmpty else {
194199
return []
195200
}
196201

197202
let remoteModelsToApply = RemoteSyncReconciler.filter(remoteModels,
198203
pendingMutations: pendingMutations)
199-
200-
for _ in 0 ..< (remoteModels.count - remoteModelsToApply.count) {
201-
notifyDropped()
202-
}
203-
204+
notifyDropped(count: remoteModels.count - remoteModelsToApply.count)
204205
return remoteModelsToApply
205206
}
206207

@@ -217,7 +218,9 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
217218
return
218219
}
219220
guard let storageAdapter = self.storageAdapter else {
220-
result = .failure(DataStoreError.nilStorageAdapter())
221+
let error = DataStoreError.nilStorageAdapter()
222+
self.notifyDropped(count: remoteModels.count, error: error)
223+
result = .failure(error)
221224
return
222225
}
223226

@@ -232,24 +235,23 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
232235
modelName: self.modelSchema.name)
233236
result = .success((remoteModels, localMetadatas))
234237
} catch {
235-
result = .failure(DataStoreError(error: error))
238+
let error = DataStoreError(error: error)
239+
self.notifyDropped(count: remoteModels.count, error: error)
240+
result = .failure(error)
236241
return
237242
}
238243
}
239244
}
240245

241246
func getDispositions(for remoteModels: [RemoteModel],
242247
localMetadatas: [LocalMetadata]) -> [RemoteSyncReconciler.Disposition] {
243-
guard let remoteModel = remoteModels.first else {
248+
guard !remoteModels.isEmpty else {
244249
return []
245250
}
246251

247252
let dispositions = RemoteSyncReconciler.getDispositions(remoteModels,
248253
localMetadatas: localMetadatas)
249-
for _ in 0 ..< (remoteModels.count - dispositions.count) {
250-
notifyDropped()
251-
}
252-
254+
notifyDropped(count: remoteModels.count - dispositions.count)
253255
return dispositions
254256
}
255257

@@ -268,7 +270,9 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
268270
return
269271
}
270272
guard let storageAdapter = self.storageAdapter else {
271-
result = .failure(DataStoreError.nilStorageAdapter())
273+
let error = DataStoreError.nilStorageAdapter()
274+
self.notifyDropped(count: dispositions.count, error: error)
275+
result = .failure(error)
272276
return
273277
}
274278

@@ -348,8 +352,8 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
348352
condition: nil) { response in
349353
switch response {
350354
case .failure(let dataStoreError):
355+
self.notifyDropped(error: dataStoreError)
351356
if storageAdapter.shouldIgnoreError(error: dataStoreError) {
352-
self.notifyDropped()
353357
promise(.success(.dropped))
354358
} else {
355359
promise(.failure(dataStoreError))
@@ -367,8 +371,8 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
367371
storageAdapter.save(untypedModel: remoteModel.model.instance) { response in
368372
switch response {
369373
case .failure(let dataStoreError):
374+
self.notifyDropped(error: dataStoreError)
370375
if storageAdapter.shouldIgnoreError(error: dataStoreError) {
371-
self.notifyDropped()
372376
promise(.success(.dropped))
373377
} else {
374378
promise(.failure(dataStoreError))
@@ -379,6 +383,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
379383
anyModel = try savedModel.eraseToAnyModel()
380384
} catch {
381385
let dataStoreError = DataStoreError(error: error)
386+
self.notifyDropped(error: dataStoreError)
382387
promise(.failure(dataStoreError))
383388
return
384389
}
@@ -401,6 +406,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
401406
storageAdapter.save(inProcessModel.syncMetadata, condition: nil) { result in
402407
switch result {
403408
case .failure(let dataStoreError):
409+
self.notifyDropped(error: dataStoreError)
404410
promise(.failure(dataStoreError))
405411
case .success(let syncMetadata):
406412
let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata)
@@ -411,8 +417,10 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
411417
}
412418
}
413419

414-
private func notifyDropped() {
415-
mutationEventPublisher.send(.mutationEventDropped(modelName: modelSchema.name))
420+
private func notifyDropped(count: Int = 1, error: DataStoreError? = nil) {
421+
for _ in 0 ..< count {
422+
mutationEventPublisher.send(.mutationEventDropped(modelName: modelSchema.name, error: error))
423+
}
416424
}
417425

418426
private func notify(savedModel: AppliedModel,
@@ -456,5 +464,5 @@ extension ReconcileAndLocalSaveOperation: DefaultLogger { }
456464

457465
enum ReconcileAndLocalSaveOperationEvent {
458466
case mutationEvent(MutationEvent)
459-
case mutationEventDropped(modelName: String)
467+
case mutationEventDropped(modelName: String, error: DataStoreError? = nil)
460468
}

0 commit comments

Comments
 (0)