Skip to content

Commit 8e3f7bd

Browse files
authored
fix(Datastore): Consecutive Updates (Save, Sync, Update and Immediately Delete Scenario) (#1407)
* fix(Datastore): consecutive updates * Add unit tests * Address review comments and add more unit tests * Address review comments * Address review comments * Address review comments * Ignore service updated fields for model equality check * Add createdAt to list of service updated fields, add more tests * Address review comments * Address review comments * Update code comments
1 parent d28d415 commit 8e3f7bd

File tree

7 files changed

+859
-146
lines changed

7 files changed

+859
-146
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,9 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
241241
private func processSuccessEvent(_ mutationEvent: MutationEvent,
242242
mutationSyncMetadata: MutationSync<AnyModel>?) {
243243
if let mutationSyncMetadata = mutationSyncMetadata {
244-
MutationEvent.updatePendingMutationEventVersionIfNil(
245-
for: mutationEvent.modelId,
246-
mutationSync: mutationSyncMetadata,
244+
MutationEvent.reconcilePendingMutationEventsVersion(
245+
sent: mutationEvent,
246+
received: mutationSyncMetadata,
247247
storageAdapter: storageAdapter) { _ in
248248
self.completeProcessingEvent(mutationEvent, mutationSyncMetadata: mutationSyncMetadata)
249249
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Foundation
9+
import Amplify
10+
11+
// swiftlint:disable cyclomatic_complexity
12+
extension ModelSchema {
13+
14+
private static let serviceUpdatedFields: Set = ["updatedAt", "createdAt"]
15+
16+
/// Compare two `Model` based on a given `ModelSchema`
17+
/// Returns true if equal, false otherwise
18+
/// Currently, schemas where system timestamps fields (createdAt, updatedAt)
19+
/// are renamed using with `@model`'s `timestamps` attribute and explicitly
20+
/// added to the input schema are not supported by this check since they are not
21+
/// marked as "read-only" fields and will fail the check when the service generates
22+
/// and returns the value of `createdAt` or `updatedAt`.
23+
/// for e.g.
24+
/// type Post @model(timestamps:{createdAt: "createdOn", updatedAt: "updatedOn"}) {
25+
/// id: ID!
26+
/// title: String!
27+
/// tags: [String!]!
28+
/// createdOn: AWSDateTime
29+
/// updatedOn: AWSDateTime
30+
/// }
31+
func compare(_ model1: Model, _ model2: Model) -> Bool {
32+
let modelType1 = ModelRegistry.modelType(from: model1.modelName)
33+
let modelType2 = ModelRegistry.modelType(from: model2.modelName)
34+
if modelType1 != modelType2 {
35+
// no need to compare models as they have different type
36+
return false
37+
}
38+
39+
for (fieldName, modelField) in fields {
40+
// read only fields or fields updated from the service are skipped for equality check
41+
// examples of such fields include `createdAt`, `updatedAt` and `coverId` in `Record`
42+
if modelField.isReadOnly || ModelSchema.serviceUpdatedFields.contains(modelField.name) {
43+
continue
44+
}
45+
46+
let value1 = model1[fieldName] ?? nil
47+
let value2 = model2[fieldName] ?? nil
48+
49+
// check equality for different `ModelFieldType`
50+
switch modelField.type {
51+
case .string:
52+
guard let value1Optional = value1 as? String?, let value2Optional = value2 as? String? else {
53+
return false
54+
}
55+
if !compare(value1Optional, value2Optional) {
56+
return false
57+
}
58+
case .int:
59+
guard let value1Optional = value1 as? Int?, let value2Optional = value2 as? Int? else {
60+
return false
61+
}
62+
if !compare(value1Optional, value2Optional) {
63+
return false
64+
}
65+
case .double:
66+
guard let value1Optional = value1 as? Double?, let value2Optional = value2 as? Double? else {
67+
return false
68+
}
69+
if !compare(value1Optional, value2Optional) {
70+
return false
71+
}
72+
case .date:
73+
guard let value1Optional = value1 as? Temporal.Date?,
74+
let value2Optional = value2 as? Temporal.Date? else {
75+
return false
76+
}
77+
if !compare(value1Optional, value2Optional) {
78+
return false
79+
}
80+
case .dateTime:
81+
guard let value1Optional = value1 as? Temporal.DateTime?,
82+
let value2Optional = value2 as? Temporal.DateTime? else {
83+
return false
84+
}
85+
if !compare(value1Optional, value2Optional) {
86+
return false
87+
}
88+
case .time:
89+
guard let value1Optional = value1 as? Temporal.Time?,
90+
let value2Optional = value2 as? Temporal.Time? else {
91+
return false
92+
}
93+
if !compare(value1Optional, value2Optional) {
94+
return false
95+
}
96+
case .timestamp:
97+
guard let value1Optional = value1 as? String?, let value2Optional = value2 as? String? else {
98+
return false
99+
}
100+
if !compare(value1Optional, value2Optional) {
101+
return false
102+
}
103+
case .bool:
104+
guard let value1Optional = value1 as? Bool?, let value2Optional = value2 as? Bool? else {
105+
return false
106+
}
107+
if !compare(value1Optional?.intValue, value2Optional?.intValue) {
108+
return false
109+
}
110+
case .enum:
111+
guard case .some(Optional<Any>.some(let value1Optional)) = value1,
112+
case .some(Optional<Any>.some(let value2Optional)) = value2 else {
113+
if value1 == nil && value2 == nil {
114+
continue
115+
}
116+
return false
117+
}
118+
let enumValue1Optional = (value1Optional as? EnumPersistable)?.rawValue
119+
let enumValue2Optional = (value2Optional as? EnumPersistable)?.rawValue
120+
if !compare(enumValue1Optional, enumValue2Optional) {
121+
return false
122+
}
123+
case .embedded, .embeddedCollection:
124+
do {
125+
if let encodable1 = value1 as? Encodable,
126+
let encodable2 = value2 as? Encodable {
127+
let json1 = try SQLiteModelValueConverter.toJSON(encodable1)
128+
let json2 = try SQLiteModelValueConverter.toJSON(encodable2)
129+
if !compare(json1, json2) {
130+
return false
131+
}
132+
}
133+
} catch {
134+
continue
135+
}
136+
// only the first level of data is used for comparison of models
137+
// and deeper levels(associated models/connections) are ignored
138+
// e.g. The graphql request contains only the information needed in the graphql variables which is sent to
139+
// the service. In such a case, the request model may have multiple levels of data while the response
140+
// model will have just one.
141+
case .model, .collection:
142+
continue
143+
}
144+
}
145+
return true
146+
}
147+
148+
private func compare<T: Comparable>(_ value1: T?, _ value2: T?) -> Bool {
149+
switch (value1, value2) {
150+
case(nil, nil):
151+
return true
152+
case(nil, .some):
153+
return false
154+
case (.some, nil):
155+
return false
156+
case (.some(let val1), .some(let val2)):
157+
return val1 == val2 ? true : false
158+
}
159+
}
160+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/Support/MutationEvent+Extensions.swift

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,44 +10,93 @@ import Dispatch
1010
import AWSPluginsCore
1111

1212
extension MutationEvent {
13-
14-
// Updates the head of pending mutation event queue for a given model `id`
15-
// if it has a `nil` version, with syncMetadata version in `mutationSync`
16-
// and saves it in the mutation event table
17-
static func updatePendingMutationEventVersionIfNil(for modelId: Model.Identifier,
18-
mutationSync: MutationSync<AnyModel>,
19-
storageAdapter: StorageEngineAdapter,
20-
completion: @escaping DataStoreCallback<Void>) {
13+
// Consecutive operations that modify a model results in a sequence of pending mutation events that
14+
// have the current version of the model. The first mutation event has the correct version of the model,
15+
// while the subsequent events will have lower versions if the first mutation event is successfully synced
16+
// to the cloud. By reconciling the pending mutation events after syncing the first mutation event,
17+
// we attempt to update the pending version to the latest version from the response.
18+
// The before and after conditions for consecutive update scenarios are as below:
19+
// - Save, then immediately update
20+
// Queue Before - [(version: nil, inprocess: true, type: .create),
21+
// (version: nil, inprocess: false, type: .update)]
22+
// Response - [version: 1, type: .create]
23+
// Queue After - [(version: 1, inprocess: false, type: .update)]
24+
// - Save, then immediately delete
25+
// Queue Before - [(version: nil, inprocess: true, type: .create),
26+
// (version: nil, inprocess: false, type: .delete)]
27+
// Response - [version: 1, type: .create]
28+
// Queue After - [(version: 1, inprocess: false, type: .delete)]
29+
// - Save, sync, then immediately update and delete
30+
// Queue Before (After save, sync)
31+
// - [(version: 1, inprocess: true, type: .update), (version: 1, inprocess: false, type: .delete)]
32+
// Response - [version: 2, type: .update]
33+
// Queue After - [(version: 2, inprocess: false, type: .delete)]
34+
//
35+
// For a given model `id`, checks the version of the head of pending mutation event queue
36+
// against the API response version in `mutationSync` and saves it in the mutation event table if
37+
// the response version is a newer one
38+
static func reconcilePendingMutationEventsVersion(sent mutationEvent: MutationEvent,
39+
received mutationSync: MutationSync<AnyModel>,
40+
storageAdapter: StorageEngineAdapter,
41+
completion: @escaping DataStoreCallback<Void>) {
2142
MutationEvent.pendingMutationEvents(
22-
for: modelId,
43+
for: mutationEvent.modelId,
2344
storageAdapter: storageAdapter) { queryResult in
2445
switch queryResult {
2546
case .failure(let dataStoreError):
2647
completion(.failure(dataStoreError))
2748
case .success(let localMutationEvents):
28-
guard var existingEvent = localMutationEvents.first else {
49+
guard let existingEvent = localMutationEvents.first else {
2950
completion(.success(()))
3051
return
3152
}
3253

33-
if existingEvent.version == nil {
34-
Amplify.log.verbose("""
35-
Replacing existing mutation event having nil version with version from mutation response
36-
\(mutationSync.syncMetadata.version)
37-
""")
38-
existingEvent.version = mutationSync.syncMetadata.version
39-
storageAdapter.save(existingEvent, condition: nil) { result in
40-
switch result {
41-
case .failure(let dataStoreError):
42-
completion(.failure(dataStoreError))
43-
case .success:
44-
completion(.success(()))
45-
}
46-
}
47-
} else {
54+
guard let reconciledEvent = reconcile(pendingMutationEvent: existingEvent,
55+
with: mutationEvent,
56+
responseMutationSync: mutationSync) else {
4857
completion(.success(()))
58+
return
4959
}
60+
61+
storageAdapter.save(reconciledEvent, condition: nil) { result in
62+
switch result {
63+
case .failure(let dataStoreError):
64+
completion(.failure(dataStoreError))
65+
case .success:
66+
completion(.success(()))
67+
}
68+
}
69+
}
70+
}
71+
}
72+
73+
static func reconcile(pendingMutationEvent: MutationEvent,
74+
with requestMutationEvent: MutationEvent,
75+
responseMutationSync: MutationSync<AnyModel>) -> MutationEvent? {
76+
// return if version of the pending mutation event is not nil and
77+
// is >= version contained in the response
78+
if pendingMutationEvent.version != nil &&
79+
pendingMutationEvent.version! >= responseMutationSync.syncMetadata.version {
80+
return nil
81+
}
82+
83+
do {
84+
let responseModel = responseMutationSync.model.instance
85+
let requestModel = try requestMutationEvent.decodeModel()
86+
87+
// check if the data sent in the request is the same as the response
88+
// if it is, update the pending mutation event version to the response version
89+
guard let modelSchema = ModelRegistry.modelSchema(from: requestMutationEvent.modelName),
90+
modelSchema.compare(responseModel, requestModel) else {
91+
return nil
5092
}
93+
94+
var pendingMutationEvent = pendingMutationEvent
95+
pendingMutationEvent.version = responseMutationSync.syncMetadata.version
96+
return pendingMutationEvent
97+
} catch {
98+
Amplify.log.verbose("Error decoding models: \(error)")
99+
return nil
51100
}
52101
}
53102

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreConsecutiveUpdatesTests.swift

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,7 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
255255

256256
let saveSyncReceived = expectation(description: "Received create mutation event on subscription for Post")
257257
let updateSyncReceived = expectation(description: "Received update mutation event on subscription for Post")
258-
// two update events are triggered
259-
// 1. update is performed successfully with version 1, and comes back with version 2
260-
// 2. delete is performed with version 1, response is conflict resolution, and when processing
261-
// the error reponse, we apply the remote model to local store, triggering a second update
262-
updateSyncReceived.expectedFulfillmentCount = 2
263-
264-
// this can be uncommented once the delete is successfully sent with version 2
265-
// let deleteSyncReceived = expectation(description: "Received delete mutation event on subscription for Post")
258+
let deleteSyncReceived = expectation(description: "Received delete mutation event on subscription for Post")
266259

267260
let hubListener = Amplify.Hub.listen(
268261
to: .dataStore,
@@ -292,9 +285,8 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
292285

293286
if mutationEvent.mutationType == GraphQLMutationType.delete.rawValue {
294287
XCTAssertEqual(post, updatedPost)
295-
// this needs to be commented out once the bug is fixed
296-
// deleteSyncReceived.fulfill()
297-
// XCTAssertEqual(mutationEvent.version, 3)
288+
deleteSyncReceived.fulfill()
289+
XCTAssertEqual(mutationEvent.version, 3)
298290
return
299291
}
300292
}
@@ -340,19 +332,11 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
340332
let queryResult = queryPost(byId: newPost.id)
341333
XCTAssertNil(queryResult)
342334

343-
wait(for: [updateSyncReceived], timeout: networkTimeout)
344-
// can be uncommented once delete mutation response is success
345-
// wait(for: [deleteSyncExpectation], timeout: networkTimeout)
335+
wait(for: [updateSyncReceived, deleteSyncReceived], timeout: networkTimeout)
346336

347337
// query the deleted post
348338
let queryResultAfterSync = queryPost(byId: updatedPost.id)
349-
350-
// this should be removed once the bug is fixed
351-
XCTAssertNotNil(queryResultAfterSync)
352-
XCTAssertEqual(queryResultAfterSync, updatedPost)
353-
354-
// this is the actual behavior which is currently failing
355-
// XCTAssertNil(post)
339+
XCTAssertNil(queryResultAfterSync)
356340

357341
let queryRequest =
358342
GraphQLRequest<MutationSyncResult?>.query(modelName: updatedPost.modelName, byId: updatedPost.id)
@@ -370,14 +354,9 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
370354
XCTAssertEqual(post.model["title"] as? String, updatedPost.title)
371355
XCTAssertEqual(post.model["content"] as? String, updatedPost.content)
372356
XCTAssertEqual(post.model["rating"] as? Double, updatedPost.rating)
373-
// the post should actually be deleted, but this is currently failing
374-
XCTAssertFalse(post.syncMetadata.deleted)
375-
376-
// can be uncommented once delete mutation response is success
377-
// currently the API request for delete mutation is sent with version 1, which
378-
// fails with error message "Conflict resolver rejects mutation."
379-
// XCTAssertTrue(post.syncMetadata.deleted)
380-
// XCTAssertEqual(post.syncMetadata.version, 3)
357+
358+
XCTAssertTrue(post.syncMetadata.deleted)
359+
XCTAssertEqual(post.syncMetadata.version, 3)
381360
apiQuerySuccess.fulfill()
382361
case .failure(let error):
383362
XCTFail("Error: \(error)")

0 commit comments

Comments
 (0)