Skip to content

Commit b70c98a

Browse files
authored
fix(DataStore): Reconcile and save serially (#1128)
* fix(DataStore): Reconcile and save serially * fix(DataStore): address PR comments * fix(DataStore): fixed test logic
1 parent 8bf2610 commit b70c98a

File tree

14 files changed

+309
-35
lines changed

14 files changed

+309
-35
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOperationEvent.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ enum InitialSyncOperationEvent {
1313
/// Used by `SyncEventEmitter` and `ModelSyncedEmitted`
1414
case started(modelName: ModelName, syncType: SyncType)
1515

16-
/// Published when a remote model is enqueued for local store reconcillation.
16+
/// Published when a remote model is enqueued for local store reconciliation.
1717
/// Used by `ModelSyncedEventEmitter` for record counting.
1818
case enqueued(MutationSync<AnyModel>)
1919

20-
/// Published when the sync operation has completed and all remote models have been enqueued for reconcillation.
20+
/// Published when the sync operation has completed and all remote models have been enqueued for reconciliation.
2121
/// Used by `ModelSyncedEventEmitter` to determine when to send `ModelSyncedEvent`
2222
case finished(modelName: ModelName)
2323
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
4040
}
4141

4242
private let connectionStatusSerialQueue: DispatchQueue
43+
private var reconcileAndSaveQueue: ReconcileAndSaveOperationQueue
4344
private var reconciliationQueues: [ModelName: ModelReconciliationQueue]
4445
private var reconciliationQueueConnectionStatus: [ModelName: Bool]
4546
private var modelReconciliationQueueFactory: ModelReconciliationQueueFactory
@@ -58,12 +59,14 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
5859
self.eventReconciliationQueueTopic = PassthroughSubject<IncomingEventReconciliationQueueEvent, DataStoreError>()
5960
self.reconciliationQueues = [:]
6061
self.reconciliationQueueConnectionStatus = [:]
62+
self.reconcileAndSaveQueue = ReconcileAndSaveQueue(modelSchemas)
6163
self.modelReconciliationQueueFactory = modelReconciliationQueueFactory ??
62-
AWSModelReconciliationQueue.init(modelSchema:storageAdapter:api:modelPredicate:auth:incomingSubscriptionEvents:)
64+
AWSModelReconciliationQueue.init(modelSchema:storageAdapter:api:reconcileAndSaveQueue:modelPredicate:auth:incomingSubscriptionEvents:)
6365
// TODO: Add target for SyncEngine system to help prevent thread explosion and increase performance
6466
// https://github.com/aws-amplify/amplify-ios/issues/399
6567
self.connectionStatusSerialQueue
6668
= DispatchQueue(label: "com.amazonaws.DataStore.AWSIncomingEventReconciliationQueue")
69+
6770
for modelSchema in modelSchemas {
6871
let modelName = modelSchema.name
6972
let syncExpression = syncExpressions.first(where: {
@@ -73,6 +76,7 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
7376
let queue = self.modelReconciliationQueueFactory(modelSchema,
7477
storageAdapter,
7578
api,
79+
reconcileAndSaveQueue,
7680
modelPredicate,
7781
auth,
7882
nil)
@@ -177,6 +181,14 @@ extension AWSIncomingEventReconciliationQueue: Resettable {
177181
queue.reset { group.leave() }
178182
}
179183
}
184+
185+
group.enter()
186+
DispatchQueue.global().async {
187+
self.reconcileAndSaveQueue.cancelAllOperations()
188+
self.reconcileAndSaveQueue.waitUntilOperationsAreFinished()
189+
group.leave()
190+
}
191+
180192
group.wait()
181193
onComplete()
182194
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ typealias ModelReconciliationQueueFactory = (
1616
ModelSchema,
1717
StorageEngineAdapter,
1818
APICategoryGraphQLBehavior,
19+
ReconcileAndSaveOperationQueue,
1920
QueryPredicate?,
2021
AuthCategoryBehavior?,
2122
IncomingSubscriptionEventPublisher?
@@ -27,15 +28,17 @@ typealias ModelReconciliationQueueFactory = (
2728
/// Although subscriptions are listened to and enqueued at initialization, you must call `start` on a
2829
/// AWSModelReconciliationQueue to write events to the DataStore.
2930
///
30-
/// Internally, a AWSModelReconciliationQueue manages different operation queues:
31-
/// - A queue to buffer incoming remote events (e.g., subscriptions, mutation results)
32-
/// - A queue to reconcile & save mutation sync events to local storage
31+
/// Internally, a AWSModelReconciliationQueue manages the `incomingSubscriptionEventQueue` to buffer incoming remote
32+
/// events (e.g., subscriptions, mutation results), and is passed the reference of `ReconcileAndSaveOperationQueue`,
33+
/// used to reconcile & save mutation sync events to local storage. A reference to the `ReconcileAndSaveOperationQueue`
34+
/// is used here since some models have to be reconciled in dependency order and `ReconcileAndSaveOperationQueue` is responsible for managing the
35+
/// ordering of these events.
3336
/// These queues are required because each of these actions have different points in the sync lifecycle at which they
3437
/// may be activated.
3538
///
3639
/// Flow:
3740
/// - `AWSModelReconciliationQueue` init()
38-
/// - `reconcileAndSaveQueue` created and activated
41+
/// - `reconcileAndSaveQueue` queue for reconciliation and local save operations, passed in from initializer.
3942
/// - `incomingSubscriptionEventQueue` created, but suspended
4043
/// - `incomingEventsSink` listener set up for incoming remote events
4144
/// - when `incomingEventsSink` listener receives an event, it adds an operation to `incomingSubscriptionEventQueue`
@@ -62,7 +65,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
6265

6366
/// Applies incoming mutation or subscription events serially to local data store for this model type. This queue
6467
/// is always active.
65-
private let reconcileAndSaveQueue: OperationQueue
68+
private let reconcileAndSaveQueue: ReconcileAndSaveOperationQueue
6669

6770
private var incomingEventsSink: AnyCancellable?
6871
private var reconcileAndLocalSaveOperationSinks: AtomicValue<Set<AnyCancellable?>>
@@ -75,6 +78,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
7578
init(modelSchema: ModelSchema,
7679
storageAdapter: StorageEngineAdapter?,
7780
api: APICategoryGraphQLBehavior,
81+
reconcileAndSaveQueue: ReconcileAndSaveOperationQueue,
7882
modelPredicate: QueryPredicate?,
7983
auth: AuthCategoryBehavior?,
8084
incomingSubscriptionEvents: IncomingSubscriptionEventPublisher? = nil) {
@@ -85,11 +89,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
8589
self.modelPredicate = modelPredicate
8690
self.modelReconciliationQueueSubject = PassthroughSubject<ModelReconciliationQueueEvent, DataStoreError>()
8791

88-
self.reconcileAndSaveQueue = OperationQueue()
89-
reconcileAndSaveQueue.name = "com.amazonaws.DataStore.\(modelSchema.name).reconcile"
90-
reconcileAndSaveQueue.maxConcurrentOperationCount = 1
91-
reconcileAndSaveQueue.underlyingQueue = DispatchQueue.global()
92-
reconcileAndSaveQueue.isSuspended = false
92+
self.reconcileAndSaveQueue = reconcileAndSaveQueue
9393

9494
self.incomingSubscriptionEventQueue = OperationQueue()
9595
incomingSubscriptionEventQueue.name = "com.amazonaws.DataStore.\(modelSchema.name).remoteEvent"
@@ -131,7 +131,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
131131
incomingEventsSink?.cancel()
132132
incomingEventsSink = nil
133133
incomingSubscriptionEvents.cancel()
134-
reconcileAndSaveQueue.cancelAllOperations()
134+
reconcileAndSaveQueue.cancelOperations(modelName: modelSchema.name)
135135
incomingSubscriptionEventQueue.cancelAllOperations()
136136
}
137137

@@ -154,7 +154,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
154154
}
155155
})
156156
reconcileAndLocalSaveOperationSinks.with { $0.insert(reconcileAndLocalSaveOperationSink) }
157-
reconcileAndSaveQueue.addOperation(reconcileOp)
157+
reconcileAndSaveQueue.addOperation(reconcileOp, modelName: remoteModel.model.modelName)
158158
}
159159

160160
private func receive(_ receive: IncomingSubscriptionEventPublisherEvent) {
@@ -214,13 +214,6 @@ extension AWSModelReconciliationQueue: Resettable {
214214
}
215215
}
216216

217-
group.enter()
218-
DispatchQueue.global().async {
219-
self.reconcileAndSaveQueue.cancelAllOperations()
220-
self.reconcileAndSaveQueue.waitUntilAllOperationsAreFinished()
221-
group.leave()
222-
}
223-
224217
group.enter()
225218
DispatchQueue.global().async {
226219
self.incomingSubscriptionEventQueue.cancelAllOperations()

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
2626
/// sent from the remote API as part of the mutation.
2727
typealias AppliedModel = MutationSync<AnyModel>
2828

29+
let id: UUID = UUID()
2930
private let workQueue = DispatchQueue(label: "com.amazonaws.ReconcileAndLocalSaveOperation",
3031
target: DispatchQueue.global())
3132

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import Combine
10+
import Foundation
11+
import AWSPluginsCore
12+
13+
@available(iOS 13.0, *)
14+
protocol ReconcileAndSaveOperationQueue {
15+
func addOperation(_ operation: ReconcileAndLocalSaveOperation, modelName: String)
16+
17+
func cancelOperations(modelName: String)
18+
19+
func cancelAllOperations()
20+
21+
func waitUntilOperationsAreFinished()
22+
23+
init(_ modelSchemas: [ModelSchema])
24+
}
25+
26+
enum ReconcileAndSaveQueueEvent {
27+
case operationAdded(id: UUID)
28+
case operationRemoved(id: UUID)
29+
case cancelledOperations(modelName: String)
30+
}
31+
32+
/// A queue used to enqueue `ReconcileAndLocalSaveOperation`s which perform reconcile and save for incoming mutation
33+
/// sync events to local storage for all model types.
34+
///
35+
/// Internally, a `ReconcileAndSaveQueue` will manage an operation queue with concurrency of 1 to perform serial
36+
/// operations to ensure models are processed in the order of dependency. For example, an initial sync
37+
/// will perform sync queries for models based on its dependency order. The results are then processed serially by this
38+
/// queue to ensure that a model A that depends on B, and B depends on C, will be reconciled in the order of C then B
39+
/// then A. This also ensures that reconciliation for individual subscription events are also processed in the order
40+
/// in which they are received by the system.
41+
///
42+
/// Additionally, this queue allows per model type cancellations on the operations that are enqueued by calling
43+
/// `cancelOperations(modelName)`. This allows per model type clean up, while allowing other model reconcilliations to
44+
/// continue to operate.
45+
@available(iOS 13.0, *)
46+
class ReconcileAndSaveQueue: ReconcileAndSaveOperationQueue {
47+
48+
private let serialQueue = DispatchQueue(label: "com.amazonaws.ReconcileAndSaveQueue.serialQueue",
49+
target: DispatchQueue.global())
50+
private let reconcileAndSaveQueue: OperationQueue
51+
private var modelReconcileAndSaveOperations: [String: [UUID: ReconcileAndLocalSaveOperation]]
52+
private var reconcileAndLocalSaveOperationSinks: AtomicValue<Set<AnyCancellable?>>
53+
54+
private let reconcileAndSaveQueueSubject: PassthroughSubject<ReconcileAndSaveQueueEvent, DataStoreError>
55+
var publisher: AnyPublisher<ReconcileAndSaveQueueEvent, DataStoreError> {
56+
reconcileAndSaveQueueSubject.eraseToAnyPublisher()
57+
}
58+
required init(_ modelSchemas: [ModelSchema]) {
59+
self.reconcileAndSaveQueueSubject = PassthroughSubject<ReconcileAndSaveQueueEvent, DataStoreError>()
60+
self.reconcileAndSaveQueue = OperationQueue()
61+
reconcileAndSaveQueue.name = "com.amazonaws.DataStore.reconcile"
62+
reconcileAndSaveQueue.maxConcurrentOperationCount = 1
63+
reconcileAndSaveQueue.underlyingQueue = DispatchQueue.global()
64+
reconcileAndSaveQueue.isSuspended = false
65+
66+
self.modelReconcileAndSaveOperations = [String: [UUID: ReconcileAndLocalSaveOperation]]()
67+
for model in modelSchemas {
68+
modelReconcileAndSaveOperations[model.name] = [UUID: ReconcileAndLocalSaveOperation]()
69+
}
70+
self.reconcileAndLocalSaveOperationSinks = AtomicValue(initialValue: Set<AnyCancellable?>())
71+
}
72+
73+
func addOperation(_ operation: ReconcileAndLocalSaveOperation, modelName: String) {
74+
serialQueue.async {
75+
var reconcileAndLocalSaveOperationSink: AnyCancellable?
76+
77+
reconcileAndLocalSaveOperationSink = operation.publisher.sink { _ in
78+
self.reconcileAndLocalSaveOperationSinks.with { $0.remove(reconcileAndLocalSaveOperationSink) }
79+
self.modelReconcileAndSaveOperations[modelName]?[operation.id] = nil
80+
self.reconcileAndSaveQueueSubject.send(.operationRemoved(id: operation.id))
81+
} receiveValue: { _ in }
82+
83+
self.reconcileAndLocalSaveOperationSinks.with { $0.insert(reconcileAndLocalSaveOperationSink) }
84+
self.modelReconcileAndSaveOperations[modelName]?[operation.id] = operation
85+
self.reconcileAndSaveQueue.addOperation(operation)
86+
self.reconcileAndSaveQueueSubject.send(.operationAdded(id: operation.id))
87+
}
88+
}
89+
90+
func cancelOperations(modelName: String) {
91+
serialQueue.async {
92+
if let operations = self.modelReconcileAndSaveOperations[modelName] {
93+
operations.values.forEach { operation in
94+
operation.cancel()
95+
}
96+
}
97+
self.modelReconcileAndSaveOperations[modelName]?.removeAll()
98+
self.reconcileAndSaveQueueSubject.send(.cancelledOperations(modelName: modelName))
99+
}
100+
}
101+
102+
func cancelAllOperations() {
103+
serialQueue.async {
104+
self.reconcileAndSaveQueue.cancelAllOperations()
105+
for (modelName, _) in self.modelReconcileAndSaveOperations {
106+
self.modelReconcileAndSaveOperations[modelName]?.removeAll()
107+
self.reconcileAndSaveQueueSubject.send(.cancelledOperations(modelName: modelName))
108+
}
109+
}
110+
}
111+
112+
// This method should only be used in the `reset` chain, which is an internal reset functionality that is used
113+
// for resetting the state of the system in testing. It blocks the current thread by not executing the work on
114+
// the serial queue since underlying operation queue's `waitUntilAllOperationsAreFinished()` behaves the same way.
115+
// See the following link for more details:
116+
// https://developer.apple.com/documentation/foundation/operationqueue/1407971-waituntilalloperationsarefinishe
117+
func waitUntilOperationsAreFinished() {
118+
reconcileAndSaveQueue.waitUntilAllOperationsAreFinished()
119+
}
120+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginAuthIntegrationTests/AWSDataStoreCategoryPluginAuthIntegrationTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class AWSDataStoreCategoryPluginAuthIntegrationTests: XCTestCase {
6464
}
6565

6666
/// A user can persist data in the local store without signing in. Once the user signs in,
67-
/// the sync engine will start and sync the mutations to the cloud. Once the reconcillation is complete, retrieving
67+
/// the sync engine will start and sync the mutations to the cloud. Once the reconciliation is complete, retrieving
6868
/// the same data will contain ownerId
6969
///
7070
/// - Given: A DataStore plugin configured with SocialNote model containing with auth rules

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreHubEventsTests.swift

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
2323
/// - When:
2424
/// - DataStore's remote sync engine is initialized
2525
/// - Then:
26-
/// - networkStatus received, payload should be: {active: true}
26+
/// - networkStatus received, payload should be: {active: false}, followed by {active: true}
2727
/// - subscriptionEstablished received, payload should be nil
2828
/// - syncQueriesStarted received, payload should be: {models: ["Post", "Comment"]}
2929
/// - outboxStatus received, payload should be {isEmpty: true}
@@ -33,6 +33,8 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
3333
func testDataStoreConfiguredDispatchesHubEvents() throws {
3434

3535
let networkStatusReceived = expectation(description: "networkStatus received")
36+
networkStatusReceived.expectedFulfillmentCount = 2
37+
var networkStatusActive = false
3638
let subscriptionsEstablishedReceived = expectation(description: "subscriptionsEstablished received")
3739
let syncQueriesStartedReceived = expectation(description: "syncQueriesStarted received")
3840
let outboxStatusReceived = expectation(description: "outboxStatus received")
@@ -43,7 +45,10 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
4345
XCTFail("Failed to cast payload data as NetworkStatusEvent")
4446
return
4547
}
46-
XCTAssertEqual(networkStatusEvent.active, true)
48+
XCTAssertEqual(networkStatusEvent.active, networkStatusActive)
49+
if !networkStatusActive {
50+
networkStatusActive = true
51+
}
4752
networkStatusReceived.fulfill()
4853
}
4954

@@ -57,7 +62,7 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
5762
XCTFail("Failed to cast payload data as SyncQueriesStartedEvent")
5863
return
5964
}
60-
XCTAssertEqual(syncQueriesStartedEvent.models.count, 2)
65+
XCTAssertEqual(syncQueriesStartedEvent.models.count, 16)
6166
syncQueriesStartedReceived.fulfill()
6267
}
6368

@@ -131,6 +136,7 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
131136

132137
func testModelSyncedAndSyncQueriesReady() throws {
133138
let modelSyncedReceived = expectation(description: "outboxMutationEnqueued received")
139+
modelSyncedReceived.assertForOverFulfill = false
134140
let syncQueriesReadyReceived = expectation(description: "outboxMutationProcessed received")
135141

136142
let expectedSyncedModelNames = ["Post", "Comment"]
@@ -142,7 +148,11 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
142148
XCTFail("Failed to cast payload data as ModelSyncedEvent")
143149
return
144150
}
145-
modelSyncedEvents.append(modelSyncedEvent)
151+
152+
if expectedSyncedModelNames.contains(modelSyncedEvent.modelName) {
153+
modelSyncedEvents.append(modelSyncedEvent)
154+
}
155+
146156
if modelSyncedEvents.count == 2 {
147157
XCTAssertEqual(modelSyncedEvents[0].modelName, expectedSyncedModelNames[0])
148158
XCTAssertTrue(modelSyncedEvents[0].isFullSync)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/InitialSync/SyncEventEmitterTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class SyncEventEmitterTests: XCTestCase {
3030
/// - When:
3131
/// - One model is registered
3232
/// - Perform an initial sync
33-
/// - Reconcillation of the models occurred
33+
/// - Reconciliation of the models occurred
3434
/// - Then:
3535
/// - One modelSynced event should be received
3636
/// - One syncQueriesReady event should be received
@@ -100,7 +100,7 @@ class SyncEventEmitterTests: XCTestCase {
100100
/// - SyncType of DeltaSync on Comment Model should be performed
101101
/// - No SyncQueries comming back
102102
/// - Perform an initial sync
103-
/// - Reconcillation of the models occurred
103+
/// - Reconciliation of the models occurred
104104
/// - Then:
105105
/// - Two modelSynced event should be received
106106
/// - One syncQueriesReady event should be received
@@ -176,7 +176,7 @@ class SyncEventEmitterTests: XCTestCase {
176176
/// - SyncType of FullSync, .delete of MutationType on Comment Model would be performed
177177
/// - One SyncQueries of each Model comming back
178178
/// - Perform an initial sync
179-
/// - Reconcillation of the models occurred
179+
/// - Reconciliation of the models occurred
180180
/// - Then:
181181
/// - Two modelSynced event should be received
182182
/// - One syncQueriesReady event should be received

0 commit comments

Comments
 (0)