Skip to content

Commit c8ab069

Browse files
author
Di Wu
authored
fix(datastore-v1): detect duplicate mutation event by both modelName and modelId (#2835)
1 parent c787436 commit c8ab069

File tree

8 files changed

+177
-68
lines changed

8 files changed

+177
-68
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
5151
}
5252

5353
MutationEvent.pendingMutationEvents(
54-
for: mutationEvent.modelId,
54+
forMutationEvent: mutationEvent,
5555
storageAdapter: storageAdapter) { result in
5656
switch result {
5757
case .failure(let dataStoreError):

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,9 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
124124
return
125125
}
126126

127-
let remoteModelIds = remoteModels.map { $0.model.identifier }
128-
129127
do {
130128
try storageAdapter.transaction {
131-
queryPendingMutations(forModelIds: remoteModelIds)
129+
queryPendingMutations(forModels: remoteModels.map(\.model))
132130
.subscribe(on: workQueue)
133131
.flatMap { mutationEvents -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> in
134132
let remoteModelsToApply = self.reconcile(remoteModels, pendingMutations: mutationEvents)
@@ -159,7 +157,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
159157
}
160158
}
161159

162-
func queryPendingMutations(forModelIds modelIds: [Model.Identifier]) -> Future<[MutationEvent], DataStoreError> {
160+
func queryPendingMutations(forModels models: [Model]) -> Future<[MutationEvent], DataStoreError> {
163161
Future<[MutationEvent], DataStoreError> { promise in
164162
var result: Result<[MutationEvent], DataStoreError> = .failure(Self.unfulfilledDataStoreError())
165163
defer {
@@ -172,21 +170,23 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
172170
}
173171
guard let storageAdapter = self.storageAdapter else {
174172
let error = DataStoreError.nilStorageAdapter()
175-
self.notifyDropped(count: modelIds.count, error: error)
173+
self.notifyDropped(count: models.count, error: error)
176174
result = .failure(error)
177175
return
178176
}
179177

180-
guard !modelIds.isEmpty else {
178+
guard !models.isEmpty else {
181179
result = .success([])
182180
return
183181
}
184182

185-
MutationEvent.pendingMutationEvents(for: modelIds,
186-
storageAdapter: storageAdapter) { queryResult in
183+
MutationEvent.pendingMutationEvents(
184+
forModels: models,
185+
storageAdapter: storageAdapter
186+
) { queryResult in
187187
switch queryResult {
188188
case .failure(let dataStoreError):
189-
self.notifyDropped(count: modelIds.count, error: dataStoreError)
189+
self.notifyDropped(count: models.count, error: dataStoreError)
190190
result = .failure(dataStoreError)
191191
case .success(let mutationEvents):
192192
result = .success(mutationEvents)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/Support/MutationEvent+Extensions.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ extension MutationEvent {
4040
storageAdapter: StorageEngineAdapter,
4141
completion: @escaping DataStoreCallback<Void>) {
4242
MutationEvent.pendingMutationEvents(
43-
for: mutationEvent.modelId,
43+
forMutationEvent: mutationEvent,
4444
storageAdapter: storageAdapter) { queryResult in
4545
switch queryResult {
4646
case .failure(let dataStoreError):

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/Support/MutationEvent+Query.swift

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,70 @@ import Amplify
99
import Dispatch
1010

1111
extension MutationEvent {
12-
static func pendingMutationEvents(for modelId: Model.Identifier,
13-
storageAdapter: StorageEngineAdapter,
14-
completion: DataStoreCallback<[MutationEvent]>) {
1512

16-
pendingMutationEvents(for: [modelId], storageAdapter: storageAdapter, completion: completion)
13+
static func pendingMutationEvents(
14+
forModel model: Model,
15+
storageAdapter: StorageEngineAdapter,
16+
completion: DataStoreCallback<[MutationEvent]>
17+
) {
18+
pendingMutationEvents(
19+
forModels: [model],
20+
storageAdapter: storageAdapter,
21+
completion: completion
22+
)
1723
}
1824

19-
static func pendingMutationEvents(for modelIds: [Model.Identifier],
20-
storageAdapter: StorageEngineAdapter,
21-
completion: DataStoreCallback<[MutationEvent]>) {
25+
static func pendingMutationEvents(
26+
forModels models: [Model],
27+
storageAdapter: StorageEngineAdapter,
28+
completion: DataStoreCallback<[MutationEvent]>
29+
) {
30+
pendingMutationEvents(
31+
for: models.map { ($0.identifier, $0.modelName) },
32+
storageAdapter: storageAdapter,
33+
completion: completion
34+
)
35+
}
36+
37+
static func pendingMutationEvents(
38+
forMutationEvent mutationEvent: MutationEvent,
39+
storageAdapter: StorageEngineAdapter,
40+
completion: DataStoreCallback<[MutationEvent]>
41+
) {
42+
pendingMutationEvents(
43+
forMutationEvents: [mutationEvent],
44+
storageAdapter: storageAdapter,
45+
completion: completion
46+
)
47+
}
48+
49+
static func pendingMutationEvents(
50+
forMutationEvents mutationEvents: [MutationEvent],
51+
storageAdapter: StorageEngineAdapter,
52+
completion: DataStoreCallback<[MutationEvent]>
53+
) {
54+
pendingMutationEvents(
55+
for: mutationEvents.map { ($0.modelId, $0.modelName) },
56+
storageAdapter: storageAdapter,
57+
completion: completion
58+
)
59+
}
60+
61+
private static func pendingMutationEvents(
62+
for modelIds: [(String, String)],
63+
storageAdapter: StorageEngineAdapter,
64+
completion: DataStoreCallback<[MutationEvent]>
65+
) {
2266
let fields = MutationEvent.keys
2367
let predicate = (fields.inProcess == false || fields.inProcess == nil)
2468
var queriedMutationEvents: [MutationEvent] = []
2569
var queryError: DataStoreError?
2670
let chunkedArrays = modelIds.chunked(into: SQLiteStorageEngineAdapter.maxNumberOfPredicates)
2771
for chunkedArray in chunkedArrays {
28-
var queryPredicates: [QueryPredicateOperation] = []
29-
for id in chunkedArray {
30-
queryPredicates.append(QueryPredicateOperation(field: fields.modelId.stringValue,
31-
operator: .equals(id)))
72+
var queryPredicates: [QueryPredicateGroup] = []
73+
for (id, name) in chunkedArray {
74+
let opration = fields.modelId == id && fields.modelName == name
75+
queryPredicates.append(opration)
3276
}
3377
let groupedQueryPredicates = QueryPredicateGroup(type: .or, predicates: queryPredicates)
3478
let final = QueryPredicateGroup(type: .and, predicates: [groupedQueryPredicates, predicate])

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreEndToEndTests.swift

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,54 @@ class DataStoreEndToEndTests: SyncEngineIntegrationTestBase {
679679
sink.cancel()
680680
}
681681

682+
/// Create model instances for different models but same primary key value
683+
///
684+
/// Given - DataStore with clean state
685+
/// When
686+
/// - create post and comment with the same random id
687+
/// Then
688+
/// - all instances should be created successfully with the same id
689+
func testCreateModelInstances_withSamePrimaryKeyForDifferentModels_allSucceed() throws {
690+
setUp(withModels: TestModelRegistration())
691+
try startAmplifyAndWaitForSync()
692+
693+
let uuid = UUID().uuidString
694+
let newPost = Post(
695+
id: uuid,
696+
title: UUID().uuidString,
697+
content: UUID().uuidString,
698+
createdAt: .now()
699+
)
700+
701+
let postCreateExpectation = expectation(description: "Post is created")
702+
var createdPost: Post?
703+
Amplify.DataStore.save(newPost) { result in
704+
defer { postCreateExpectation.fulfill() }
705+
if case .success(let post) = result {
706+
createdPost = post
707+
}
708+
}
709+
let newComment = Comment(
710+
id: uuid,
711+
content: UUID().uuidString,
712+
createdAt: .now(),
713+
post: newPost
714+
)
715+
var createdComment: Comment?
716+
let commentCreateExpectation = expectation(description: "Comment is created")
717+
Amplify.DataStore.save(newComment) { result in
718+
defer { commentCreateExpectation.fulfill() }
719+
if case .success(let comment) = result {
720+
createdComment = comment
721+
}
722+
}
723+
wait(for: [postCreateExpectation, commentCreateExpectation], timeout: 5)
724+
725+
XCTAssertNotNil(createdPost)
726+
XCTAssertNotNil(createdComment)
727+
XCTAssertEqual(createdPost?.id, createdComment?.id)
728+
}
729+
682730
///
683731
/// - Given: DataStore with clean state
684732
/// - When:

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
219219
let expect = expectation(description: "storage adapter error")
220220

221221
storageAdapter = nil
222-
operation.queryPendingMutations(forModelIds: [anyPostMutationSync.model.id])
222+
operation.queryPendingMutations(forModels: [anyPostMutationSync.model])
223223
.sink(receiveCompletion: { completion in
224224
switch completion {
225225
case .failure(let error):
@@ -239,7 +239,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
239239
let expect = expectation(description: "should complete successfully for empty input")
240240
expect.expectedFulfillmentCount = 2
241241

242-
operation.queryPendingMutations(forModelIds: [])
242+
operation.queryPendingMutations(forModels: [])
243243
.sink(receiveCompletion: { completion in
244244
switch completion {
245245
case .failure(let error):
@@ -263,7 +263,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
263263
return .success([self.anyPostMutationEvent])
264264
}
265265
storageAdapter.responders[.queryModelTypePredicate] = queryResponder
266-
operation.queryPendingMutations(forModelIds: [anyPostMutationSync.model.id])
266+
operation.queryPendingMutations(forModels: [anyPostMutationSync.model])
267267
.sink(receiveCompletion: { completion in
268268
switch completion {
269269
case .failure(let error):
@@ -294,7 +294,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
294294
}
295295
}.store(in: &cancellables)
296296
storageAdapter.responders[.queryModelTypePredicate] = queryResponder
297-
operation.queryPendingMutations(forModelIds: [anyPostMutationSync.model.id])
297+
operation.queryPendingMutations(forModels: [anyPostMutationSync.model])
298298
.sink(receiveCompletion: { completion in
299299
switch completion {
300300
case .failure:
@@ -694,6 +694,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
694694
expect.fulfill()
695695
}).store(in: &cancellables)
696696
waitForExpectations(timeout: 1)
697+
Amplify.Hub.removeListener(hubListener)
697698
}
698699

699700
func testApplyRemoteModels_deleteDisposition() {
@@ -750,6 +751,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
750751
expect.fulfill()
751752
}).store(in: &cancellables)
752753
waitForExpectations(timeout: 1)
754+
Amplify.Hub.removeListener(hubListener)
753755
}
754756

755757
func testApplyRemoteModels_multipleDispositions() {
@@ -826,6 +828,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
826828
expect.fulfill()
827829
}).store(in: &cancellables)
828830
waitForExpectations(timeout: 1)
831+
Amplify.Hub.removeListener(hubListener)
829832
}
830833

831834
func testApplyRemoteModels_saveFail() throws {

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/Support/MutationEventExtensionsTests.swift

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
3838
version: nil)
3939
let responseMutationSync = createMutationSync(model: post, version: 1)
4040

41-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
41+
setUpPendingMutationQueue(post, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
4242

4343
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
4444
with: requestMutationEvent,
@@ -63,8 +63,10 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
6363
wait(for: [updatingVersionExpectation], timeout: 1)
6464

6565
// query for head of mutation event table for given model id and check if it has the updated version
66-
MutationEvent.pendingMutationEvents(for: post.id,
67-
storageAdapter: storageAdapter) { result in
66+
MutationEvent.pendingMutationEvents(
67+
forModel: post,
68+
storageAdapter: storageAdapter
69+
) { result in
6870
switch result {
6971
case .failure(let error):
7072
XCTFail("Error : \(error)")
@@ -107,7 +109,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
107109
version: nil)
108110
let responseMutationSync = createMutationSync(model: post, version: 1)
109111

110-
setUpPendingMutationQueue(modelId,
112+
setUpPendingMutationQueue(post,
111113
[requestMutationEvent, pendingUpdateMutationEvent, pendingDeleteMutationEvent],
112114
pendingUpdateMutationEvent)
113115

@@ -134,8 +136,10 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
134136
wait(for: [updatingVersionExpectation], timeout: 1)
135137

136138
// query for head of mutation event table for given model id and check if it has the updated version
137-
MutationEvent.pendingMutationEvents(for: post.id,
138-
storageAdapter: storageAdapter) { result in
139+
MutationEvent.pendingMutationEvents(
140+
forModel: post,
141+
storageAdapter: storageAdapter
142+
) { result in
139143
switch result {
140144
case .failure(let error):
141145
XCTFail("Error : \(error)")
@@ -174,7 +178,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
174178
version: 2)
175179
let responseMutationSync = createMutationSync(model: post1, version: 1)
176180

177-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
181+
setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
178182

179183
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
180184
with: requestMutationEvent,
@@ -198,8 +202,10 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
198202
wait(for: [updatingVersionExpectation], timeout: 1)
199203

200204
// query for head of mutation event table for given model id and check if it has the correct version
201-
MutationEvent.pendingMutationEvents(for: post1.id,
202-
storageAdapter: storageAdapter) { result in
205+
MutationEvent.pendingMutationEvents(
206+
forModel: post1,
207+
storageAdapter: storageAdapter
208+
) { result in
203209
switch result {
204210
case .failure(let error):
205211
XCTFail("Error : \(error)")
@@ -238,7 +244,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
238244
version: 1)
239245
let responseMutationSync = createMutationSync(model: post3, version: 2)
240246

241-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
247+
setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
242248

243249
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
244250
with: requestMutationEvent,
@@ -262,8 +268,10 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
262268
wait(for: [updatingVersionExpectation], timeout: 1)
263269

264270
// query for head of mutation event table for given model id and check if it has the correct version
265-
MutationEvent.pendingMutationEvents(for: post1.id,
266-
storageAdapter: storageAdapter) { result in
271+
MutationEvent.pendingMutationEvents(
272+
forModel: post1,
273+
storageAdapter: storageAdapter
274+
) { result in
267275
switch result {
268276
case .failure(let error):
269277
XCTFail("Error : \(error)")
@@ -301,7 +309,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
301309
version: 1)
302310
let responseMutationSync = createMutationSync(model: post1, version: 2)
303311

304-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
312+
setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
305313

306314
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
307315
with: requestMutationEvent,
@@ -325,8 +333,10 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
325333
wait(for: [updatingVersionExpectation], timeout: 1)
326334

327335
// query for head of mutation event table for given model id and check if it has the correct version
328-
MutationEvent.pendingMutationEvents(for: post1.id,
329-
storageAdapter: storageAdapter) { result in
336+
MutationEvent.pendingMutationEvents(
337+
forModel: post1,
338+
storageAdapter: storageAdapter
339+
) { result in
330340
switch result {
331341
case .failure(let error):
332342
XCTFail("Error : \(error)")
@@ -367,7 +377,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
367377
return MutationSync(model: AnyModel(model), syncMetadata: metadata)
368378
}
369379

370-
private func setUpPendingMutationQueue(_ modelId: String,
380+
private func setUpPendingMutationQueue(_ model: Model,
371381
_ mutationEvents: [MutationEvent],
372382
_ expectedHeadOfQueue: MutationEvent) {
373383
for mutationEvent in mutationEvents {
@@ -384,8 +394,10 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
384394

385395
// verify the head of queue is expected
386396
let headOfQueueExpectation = expectation(description: "head of mutation event queue is as expected")
387-
MutationEvent.pendingMutationEvents(for: modelId,
388-
storageAdapter: storageAdapter) { result in
397+
MutationEvent.pendingMutationEvents(
398+
forModel: model,
399+
storageAdapter: storageAdapter
400+
) { result in
389401
switch result {
390402
case .failure(let error):
391403
XCTFail("Error : \(error)")

0 commit comments

Comments
 (0)