Skip to content

Commit d4de998

Browse files
authored
fix(Datastore): consecutive updates nil version scenario (#1333)
* fix(Datastore): consecutive updates nil version scenario * Address review comments * Move processing logic to OutgoingMutationQueue and add new files for Mutation Event extension * Add missing files for previous commit * Address review comments * Address review comments * Address review comments
1 parent 9f77181 commit d4de998

File tree

6 files changed

+296
-53
lines changed

6 files changed

+296
-53
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
222222
api: APICategoryGraphQLBehavior) {
223223
if case let .success(graphQLResponse) = result {
224224
if case let .success(graphQLResult) = graphQLResponse {
225-
completeProcessingEvent(mutationEvent,
225+
processSuccessEvent(mutationEvent,
226226
mutationSyncMetadata: graphQLResult)
227227
} else if case let .failure(graphQLResponseError) = graphQLResponse {
228228
processMutationErrorFromCloud(mutationEvent: mutationEvent,
@@ -238,6 +238,22 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
238238
}
239239
}
240240

241+
/// Process the successful response from API by updating the mutation events in
242+
/// mutation event table having `nil` version
243+
private func processSuccessEvent(_ mutationEvent: MutationEvent,
244+
mutationSyncMetadata: MutationSync<AnyModel>?) {
245+
if let mutationSyncMetadata = mutationSyncMetadata {
246+
MutationEvent.updatePendingMutationEventVersionIfNil(
247+
for: mutationEvent.modelId,
248+
mutationSync: mutationSyncMetadata,
249+
storageAdapter: storageAdapter) { _ in
250+
self.completeProcessingEvent(mutationEvent, mutationSyncMetadata: mutationSyncMetadata)
251+
}
252+
} else {
253+
completeProcessingEvent(mutationEvent, mutationSyncMetadata: mutationSyncMetadata)
254+
}
255+
}
256+
241257
private func processMutationErrorFromCloud(mutationEvent: MutationEvent,
242258
api: APICategoryGraphQLBehavior,
243259
apiError: APIError?,
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import Dispatch
10+
import AWSPluginsCore
11+
12+
extension MutationEvent {
13+
14+
// Updates the head of pending mutation event queue for a given model `id`
15+
// if it has a `nil` version, with syncMetadata version in `mutationSync`
16+
// and saves it in the mutation event table
17+
static func updatePendingMutationEventVersionIfNil(for modelId: Model.Identifier,
18+
mutationSync: MutationSync<AnyModel>,
19+
storageAdapter: StorageEngineAdapter,
20+
completion: @escaping DataStoreCallback<Void>) {
21+
MutationEvent.pendingMutationEvents(
22+
for: modelId,
23+
storageAdapter: storageAdapter) { queryResult in
24+
switch queryResult {
25+
case .failure(let dataStoreError):
26+
completion(.failure(dataStoreError))
27+
case .success(let localMutationEvents):
28+
guard var existingEvent = localMutationEvents.first else {
29+
completion(.success(()))
30+
return
31+
}
32+
33+
if existingEvent.version == nil {
34+
Amplify.log.verbose("""
35+
Replacing existing mutation event having nil version with version from mutation response
36+
\(mutationSync.syncMetadata.version)
37+
""")
38+
existingEvent.version = mutationSync.syncMetadata.version
39+
storageAdapter.save(existingEvent, condition: nil) { result in
40+
switch result {
41+
case .failure(let dataStoreError):
42+
completion(.failure(dataStoreError))
43+
case .success:
44+
completion(.success(()))
45+
}
46+
}
47+
} else {
48+
completion(.success(()))
49+
}
50+
}
51+
}
52+
}
53+
54+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreConsecutiveUpdatesTests.swift

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import AWSPluginsCore
1616

1717
// swiftlint:disable cyclomatic_complexity
1818
// swiftlint:disable type_body_length
19-
// swiftlint:disable file_length
2019
class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
2120
/// - Given: API has been setup with `Post` model registered
2221
/// - When: A Post is saved and then immediately updated
@@ -58,14 +57,7 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
5857
}
5958

6059
if mutationEvent.mutationType == GraphQLMutationType.update.rawValue {
61-
// this should be removed once the bug is fixed
62-
// the bug is the update mutation event is being sent to the API with nil version,
63-
// causing a successful response with the existing post data
64-
XCTAssertEqual(post, newPost)
65-
66-
// this is the expected behavior which is currently failing
67-
// XCTAssertEqual(post, updatedPost)
68-
60+
XCTAssertEqual(post, updatedPost)
6961
XCTAssertEqual(mutationEvent.version, 2)
7062
updateSyncReceived.fulfill()
7163
return
@@ -110,13 +102,7 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
110102
return
111103
}
112104

113-
// this should be removed once the bug is fixed
114-
// the bug is the update mutation event is being sent to the API with nil version,
115-
// causing a successful response with the existing post data
116-
XCTAssertEqual(queryResultAfterSync, newPost)
117-
118-
// this is the expected behavior which is currently failing
119-
// XCTAssertEqual(queryResultAfterSync, updatedPost)
105+
XCTAssertEqual(queryResultAfterSync, updatedPost)
120106

121107
let queryRequest =
122108
GraphQLRequest<MutationSyncResult?>.query(modelName: updatedPost.modelName, byId: updatedPost.id)
@@ -131,17 +117,9 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
131117
return
132118
}
133119

134-
// this should be removed once the bug is fixed
135-
// the bug is the update mutation event is being sent to the API with nil version,
136-
// causing a successful response with the existing post data
137-
XCTAssertEqual(post.model["title"] as? String, newPost.title)
138-
XCTAssertEqual(post.model["content"] as? String, newPost.content)
139-
XCTAssertEqual(post.model["rating"] as? Double, newPost.rating)
140-
141-
// this is the expected behavior which is currently failing
142-
// XCTAssertEqual(post.title, updatedPost.title)
143-
// XCTAssertEqual(post.content, updatedPost.content)
144-
// XCTAssertEqual(post.rating, updatedPost.rating)
120+
XCTAssertEqual(post.model["title"] as? String, updatedPost.title)
121+
XCTAssertEqual(post.model["content"] as? String, updatedPost.content)
122+
XCTAssertEqual(post.model["rating"] as? Double, updatedPost.rating)
145123
XCTAssertEqual(post.syncMetadata.version, 2)
146124
apiQuerySuccess.fulfill()
147125
case .failure(let error):
@@ -167,11 +145,7 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
167145
status: .published)
168146

169147
let saveSyncReceived = expectation(description: "Received create mutation event on subscription for Post")
170-
// this can be uncommented out once the bug is fixed
171-
// currently the API request for delete mutation is sent with nil version, which
172-
// fails with error message "Conflict resolver rejects mutation."
173-
// because the request failed, subscription does not receive the delete event, and hub event is never fired
174-
// let deleteSyncReceived = expectation(description: "Received delete mutation event on subscription for Post")
148+
let deleteSyncReceived = expectation(description: "Received delete mutation event on subscription for Post")
175149

176150
let hubListener = Amplify.Hub.listen(
177151
to: .dataStore,
@@ -194,8 +168,8 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
194168

195169
if mutationEvent.mutationType == GraphQLMutationType.delete.rawValue {
196170
XCTAssertEqual(post, newPost)
197-
// can be uncommented once delete mutation response is success
198-
// deleteSyncReceived.fulfill()
171+
XCTAssertEqual(mutationEvent.version, 2)
172+
deleteSyncReceived.fulfill()
199173
return
200174
}
201175
}
@@ -227,19 +201,11 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
227201
let queryResult = queryPost(byId: newPost.id)
228202
XCTAssertNil(queryResult)
229203

230-
wait(for: [saveSyncReceived], timeout: networkTimeout)
231-
// can be uncommented once delete mutation response is success
232-
// wait(for: [deleteSyncReceived], timeout: networkTimeout)
204+
wait(for: [saveSyncReceived, deleteSyncReceived], timeout: networkTimeout)
233205

234206
// query the deleted post in eventual consistent state
235207
let queryResultAfterSync = queryPost(byId: newPost.id)
236-
237-
// this should be removed once the bug is fixed, the post should actually be deleted
238-
XCTAssertNotNil(queryResultAfterSync)
239-
XCTAssertEqual(queryResultAfterSync, newPost)
240-
241-
// this is the actual behavior which is currently failing
242-
// XCTAssertNil(post)
208+
XCTAssertNil(queryResultAfterSync)
243209

244210
let queryRequest =
245211
GraphQLRequest<MutationSyncResult?>.query(modelName: newPost.modelName, byId: newPost.id)
@@ -257,13 +223,8 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
257223
XCTAssertEqual(post.model["title"] as? String, newPost.title)
258224
XCTAssertEqual(post.model["content"] as? String, newPost.content)
259225
XCTAssertEqual(post.model["rating"] as? Double, newPost.rating)
260-
// the post should actually be deleted, but this is currently failing
261-
XCTAssertFalse(post.syncMetadata.deleted)
262-
263-
// can be uncommented once delete mutation response is success
264-
// currently the API request for delete mutation is sent with nil version, which
265-
// fails with error message "Conflict resolver rejects mutation."
266-
// XCTAssertTrue(post.syncMetadata.deleted)
226+
XCTAssertTrue(post.syncMetadata.deleted)
227+
XCTAssertEqual(post.syncMetadata.version, 2)
267228
apiQuerySuccess.fulfill()
268229
case .failure(let error):
269230
XCTFail("Error: \(error)")
@@ -333,6 +294,7 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
333294
XCTAssertEqual(post, updatedPost)
334295
// this needs to be commented out once the bug is fixed
335296
// deleteSyncReceived.fulfill()
297+
// XCTAssertEqual(mutationEvent.version, 3)
336298
return
337299
}
338300
}
@@ -415,6 +377,7 @@ class DataStoreConsecutiveUpdatesTests: SyncEngineIntegrationTestBase {
415377
// currently the API request for delete mutation is sent with version 1, which
416378
// fails with error message "Conflict resolver rejects mutation."
417379
// XCTAssertTrue(post.syncMetadata.deleted)
380+
// XCTAssertEqual(post.syncMetadata.version, 3)
418381
apiQuerySuccess.fulfill()
419382
case .failure(let error):
420383
XCTFail("Error: \(error)")

0 commit comments

Comments
 (0)