Skip to content

Commit 51d6cb3

Browse files
author
Di Wu
authored
fix(datastore): detect duplicate mutation event by both modelName and modelId (#2834)
1 parent b3afc1f commit 51d6cb3

File tree

8 files changed

+140
-63
lines changed

8 files changed

+140
-63
lines changed

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
4949
}
5050

5151
MutationEvent.pendingMutationEvents(
52-
for: mutationEvent.modelId,
52+
forMutationEvent: mutationEvent,
5353
storageAdapter: storageAdapter) { result in
5454
switch result {
5555
case .failure(let dataStoreError):

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,9 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
131131
return
132132
}
133133

134-
let remoteModelIds = remoteModels.map { $0.model.identifier }
135-
136134
do {
137135
try storageAdapter.transaction {
138-
queryPendingMutations(forModelIds: remoteModelIds)
136+
queryPendingMutations(withModels: remoteModels.map(\.model))
139137
.subscribe(on: workQueue)
140138
.flatMap { mutationEvents -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> in
141139
let remoteModelsToApply = self.reconcile(remoteModels, pendingMutations: mutationEvents)
@@ -166,7 +164,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
166164
}
167165
}
168166

169-
func queryPendingMutations(forModelIds modelIds: [String]) -> Future<[MutationEvent], DataStoreError> {
167+
func queryPendingMutations(withModels models: [Model]) -> Future<[MutationEvent], DataStoreError> {
170168
Future<[MutationEvent], DataStoreError> { promise in
171169
var result: Result<[MutationEvent], DataStoreError> = .failure(Self.unfulfilledDataStoreError())
172170
guard !self.isCancelled else {
@@ -177,23 +175,25 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
177175
}
178176
guard let storageAdapter = self.storageAdapter else {
179177
let error = DataStoreError.nilStorageAdapter()
180-
self.notifyDropped(count: modelIds.count, error: error)
178+
self.notifyDropped(count: models.count, error: error)
181179
result = .failure(error)
182180
promise(result)
183181
return
184182
}
185183

186-
guard !modelIds.isEmpty else {
184+
guard !models.isEmpty else {
187185
result = .success([])
188186
promise(result)
189187
return
190188
}
191189

192-
MutationEvent.pendingMutationEvents(for: modelIds,
193-
storageAdapter: storageAdapter) { queryResult in
190+
MutationEvent.pendingMutationEvents(
191+
forModels: models,
192+
storageAdapter: storageAdapter
193+
) { queryResult in
194194
switch queryResult {
195195
case .failure(let dataStoreError):
196-
self.notifyDropped(count: modelIds.count, error: dataStoreError)
196+
self.notifyDropped(count: models.count, error: dataStoreError)
197197
result = .failure(dataStoreError)
198198
case .success(let mutationEvents):
199199
result = .success(mutationEvents)

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/MutationEvent+Extensions.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ extension MutationEvent {
4040
storageAdapter: StorageEngineAdapter,
4141
completion: @escaping DataStoreCallback<Void>) {
4242
MutationEvent.pendingMutationEvents(
43-
for: mutationEvent.modelId,
44-
storageAdapter: storageAdapter) { queryResult in
43+
forMutationEvent: mutationEvent,
44+
storageAdapter: storageAdapter
45+
) { queryResult in
4546
switch queryResult {
4647
case .failure(let dataStoreError):
4748
completion(.failure(dataStoreError))

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/MutationEvent+Query.swift

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,55 @@ import Amplify
99
import Dispatch
1010

1111
extension MutationEvent {
12-
static func pendingMutationEvents(for modelId: String,
13-
storageAdapter: StorageEngineAdapter,
14-
completion: @escaping DataStoreCallback<[MutationEvent]>) {
15-
16-
pendingMutationEvents(for: [modelId], storageAdapter: storageAdapter, completion: completion)
12+
static func pendingMutationEvents(
13+
forModel model: Model,
14+
storageAdapter: StorageEngineAdapter,
15+
completion: @escaping DataStoreCallback<[MutationEvent]>
16+
) {
17+
pendingMutationEvents(
18+
forModels: [model],
19+
storageAdapter: storageAdapter,
20+
completion: completion
21+
)
22+
}
23+
24+
static func pendingMutationEvents(
25+
forMutationEvent mutationEvent: MutationEvent,
26+
storageAdapter: StorageEngineAdapter,
27+
completion: @escaping DataStoreCallback<[MutationEvent]>
28+
) {
29+
pendingMutationEvents(
30+
forMutationEvents: [mutationEvent],
31+
storageAdapter: storageAdapter,
32+
completion: completion
33+
)
34+
}
35+
36+
static func pendingMutationEvents(
37+
forMutationEvents mutationEvents: [MutationEvent],
38+
storageAdapter: StorageEngineAdapter,
39+
completion: @escaping DataStoreCallback<[MutationEvent]>
40+
) {
41+
pendingMutationEvents(
42+
for: mutationEvents.map { ($0.modelId, $0.modelName) },
43+
storageAdapter: storageAdapter,
44+
completion: completion
45+
)
46+
}
47+
48+
static func pendingMutationEvents(
49+
forModels models: [Model],
50+
storageAdapter: StorageEngineAdapter,
51+
completion: @escaping DataStoreCallback<[MutationEvent]>
52+
) {
53+
pendingMutationEvents(
54+
for: models.map { ($0.identifier, $0.modelName) },
55+
storageAdapter: storageAdapter,
56+
completion: completion
57+
)
1758
}
1859

19-
static func pendingMutationEvents(for modelIds: [String],
60+
private static func pendingMutationEvents(for modelIds: [(String, String)],
2061
storageAdapter: StorageEngineAdapter,
2162
completion: @escaping DataStoreCallback<[MutationEvent]>) {
2263
Task {
@@ -25,10 +66,10 @@ extension MutationEvent {
2566
let chunkedArrays = modelIds.chunked(into: SQLiteStorageEngineAdapter.maxNumberOfPredicates)
2667
var queriedMutationEvents: [MutationEvent] = []
2768
for chunkedArray in chunkedArrays {
28-
var queryPredicates: [QueryPredicateOperation] = []
29-
for id in chunkedArray {
30-
queryPredicates.append(QueryPredicateOperation(field: fields.modelId.stringValue,
31-
operator: .equals(id)))
69+
var queryPredicates: [QueryPredicateGroup] = []
70+
for (id, name) in chunkedArray {
71+
let operation = fields.modelId == id && fields.modelName == name
72+
queryPredicates.append(operation)
3273
}
3374
let groupedQueryPredicates = QueryPredicateGroup(type: .or, predicates: queryPredicates)
3475
let final = QueryPredicateGroup(type: .and, predicates: [groupedQueryPredicates, predicate])

AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
222222
let expect = expectation(description: "storage adapter error")
223223

224224
storageAdapter = nil
225-
operation.queryPendingMutations(forModelIds: [anyPostMutationSync.model.id])
225+
operation.queryPendingMutations(withModels: [anyPostMutationSync.model])
226226
.sink(receiveCompletion: { completion in
227227
switch completion {
228228
case .failure(let error):
@@ -242,7 +242,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
242242
let expect = expectation(description: "should complete successfully for empty input")
243243
expect.expectedFulfillmentCount = 2
244244

245-
operation.queryPendingMutations(forModelIds: [])
245+
operation.queryPendingMutations(withModels: [])
246246
.sink(receiveCompletion: { completion in
247247
switch completion {
248248
case .failure(let error):
@@ -266,7 +266,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
266266
return .success([self.anyPostMutationEvent])
267267
}
268268
storageAdapter.responders[.queryModelTypePredicate] = queryResponder
269-
operation.queryPendingMutations(forModelIds: [anyPostMutationSync.model.id])
269+
operation.queryPendingMutations(withModels: [anyPostMutationSync.model])
270270
.sink(receiveCompletion: { completion in
271271
switch completion {
272272
case .failure(let error):
@@ -297,7 +297,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
297297
}
298298
}.store(in: &cancellables)
299299
storageAdapter.responders[.queryModelTypePredicate] = queryResponder
300-
operation.queryPendingMutations(forModelIds: [anyPostMutationSync.model.id])
300+
operation.queryPendingMutations(withModels: [anyPostMutationSync.model])
301301
.sink(receiveCompletion: { completion in
302302
switch completion {
303303
case .failure:

AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/Support/MutationEventExtensionsTests.swift

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
3737
version: nil)
3838
let responseMutationSync = createMutationSync(model: post, version: 1)
3939

40-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
40+
setUpPendingMutationQueue(post, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
4141

4242
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
4343
with: requestMutationEvent,
@@ -62,7 +62,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
6262
wait(for: [updatingVersionExpectation], timeout: 1)
6363

6464
// query for head of mutation event table for given model id and check if it has the updated version
65-
MutationEvent.pendingMutationEvents(for: post.id,
65+
MutationEvent.pendingMutationEvents(forModel: post,
6666
storageAdapter: storageAdapter) { result in
6767
switch result {
6868
case .failure(let error):
@@ -106,7 +106,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
106106
version: nil)
107107
let responseMutationSync = createMutationSync(model: post, version: 1)
108108

109-
setUpPendingMutationQueue(modelId,
109+
setUpPendingMutationQueue(post,
110110
[requestMutationEvent, pendingUpdateMutationEvent, pendingDeleteMutationEvent],
111111
pendingUpdateMutationEvent)
112112

@@ -133,7 +133,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
133133
wait(for: [updatingVersionExpectation], timeout: 1)
134134

135135
// query for head of mutation event table for given model id and check if it has the updated version
136-
MutationEvent.pendingMutationEvents(for: post.id,
136+
MutationEvent.pendingMutationEvents(forModel: post,
137137
storageAdapter: storageAdapter) { result in
138138
switch result {
139139
case .failure(let error):
@@ -173,7 +173,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
173173
version: 2)
174174
let responseMutationSync = createMutationSync(model: post1, version: 1)
175175

176-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
176+
setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
177177

178178
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
179179
with: requestMutationEvent,
@@ -197,7 +197,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
197197
wait(for: [updatingVersionExpectation], timeout: 1)
198198

199199
// query for head of mutation event table for given model id and check if it has the correct version
200-
MutationEvent.pendingMutationEvents(for: post1.id,
200+
MutationEvent.pendingMutationEvents(forModel: post1,
201201
storageAdapter: storageAdapter) { result in
202202
switch result {
203203
case .failure(let error):
@@ -237,7 +237,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
237237
version: 1)
238238
let responseMutationSync = createMutationSync(model: post3, version: 2)
239239

240-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
240+
setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
241241

242242
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
243243
with: requestMutationEvent,
@@ -261,7 +261,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
261261
wait(for: [updatingVersionExpectation], timeout: 1)
262262

263263
// query for head of mutation event table for given model id and check if it has the correct version
264-
MutationEvent.pendingMutationEvents(for: post1.id,
264+
MutationEvent.pendingMutationEvents(forModel: post1,
265265
storageAdapter: storageAdapter) { result in
266266
switch result {
267267
case .failure(let error):
@@ -300,7 +300,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
300300
version: 1)
301301
let responseMutationSync = createMutationSync(model: post1, version: 2)
302302

303-
setUpPendingMutationQueue(modelId, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
303+
setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent)
304304

305305
let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent,
306306
with: requestMutationEvent,
@@ -324,7 +324,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
324324
wait(for: [updatingVersionExpectation], timeout: 1)
325325

326326
// query for head of mutation event table for given model id and check if it has the correct version
327-
MutationEvent.pendingMutationEvents(for: post1.id,
327+
MutationEvent.pendingMutationEvents(forModel: post1,
328328
storageAdapter: storageAdapter) { result in
329329
switch result {
330330
case .failure(let error):
@@ -366,7 +366,7 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
366366
return MutationSync(model: AnyModel(model), syncMetadata: metadata)
367367
}
368368

369-
private func setUpPendingMutationQueue(_ modelId: String,
369+
private func setUpPendingMutationQueue(_ model: Model,
370370
_ mutationEvents: [MutationEvent],
371371
_ expectedHeadOfQueue: MutationEvent) {
372372
for mutationEvent in mutationEvents {
@@ -383,8 +383,10 @@ class MutationEventExtensionsTest: BaseDataStoreTests {
383383

384384
// verify the head of queue is expected
385385
let headOfQueueExpectation = expectation(description: "head of mutation event queue is as expected")
386-
MutationEvent.pendingMutationEvents(for: modelId,
387-
storageAdapter: storageAdapter) { result in
386+
MutationEvent.pendingMutationEvents(
387+
forModel: model,
388+
storageAdapter: storageAdapter
389+
) { result in
388390
switch result {
389391
case .failure(let error):
390392
XCTFail("Error : \(error)")

AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/Support/MutationEventQueryTests.swift

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ class MutationEventQueryTests: BaseDataStoreTests {
1818

1919
func testQueryPendingMutation_EmptyResult() {
2020
let querySuccess = expectation(description: "query mutation events success")
21-
let modelIds = [UUID().uuidString, UUID().uuidString]
21+
let mutationEvents = [generateRandomMutationEvent(), generateRandomMutationEvent()]
2222

23-
MutationEvent.pendingMutationEvents(for: modelIds, storageAdapter: storageAdapter) { result in
23+
MutationEvent.pendingMutationEvents(forMutationEvents: mutationEvents, storageAdapter: storageAdapter) { result in
2424
switch result {
2525
case .success(let mutationEvents):
2626
XCTAssertTrue(mutationEvents.isEmpty)
@@ -33,19 +33,17 @@ class MutationEventQueryTests: BaseDataStoreTests {
3333
}
3434

3535
func testQueryPendingMutationEvent() {
36-
let mutationEvent = MutationEvent(id: UUID().uuidString,
37-
modelId: UUID().uuidString,
38-
modelName: Post.modelName,
39-
json: "",
40-
mutationType: .create)
36+
let mutationEvent = generateRandomMutationEvent()
4137

4238
let querySuccess = expectation(description: "query for pending mutation events")
4339

4440
storageAdapter.save(mutationEvent) { result in
4541
switch result {
4642
case .success:
47-
MutationEvent.pendingMutationEvents(for: mutationEvent.modelId,
48-
storageAdapter: self.storageAdapter) { result in
43+
MutationEvent.pendingMutationEvents(
44+
forMutationEvent: mutationEvent,
45+
storageAdapter: self.storageAdapter
46+
) { result in
4947
switch result {
5048
case .success(let mutationEvents):
5149
XCTAssertEqual(mutationEvents.count, 1)
@@ -61,16 +59,8 @@ class MutationEventQueryTests: BaseDataStoreTests {
6159
}
6260

6361
func testQueryPendingMutationEventsForModelIds() {
64-
let mutationEvent1 = MutationEvent(id: UUID().uuidString,
65-
modelId: UUID().uuidString,
66-
modelName: Post.modelName,
67-
json: "",
68-
mutationType: .create)
69-
let mutationEvent2 = MutationEvent(id: UUID().uuidString,
70-
modelId: UUID().uuidString,
71-
modelName: Post.modelName,
72-
json: "",
73-
mutationType: .create)
62+
let mutationEvent1 = generateRandomMutationEvent()
63+
let mutationEvent2 = generateRandomMutationEvent()
7464

7565
let saveMutationEvent1 = expectation(description: "save mutationEvent1 success")
7666
storageAdapter.save(mutationEvent1) { result in
@@ -93,11 +83,13 @@ class MutationEventQueryTests: BaseDataStoreTests {
9383
wait(for: [saveMutationEvent2], timeout: 1)
9484

9585
let querySuccess = expectation(description: "query for metadata success")
96-
var modelIds = [mutationEvent1.modelId]
97-
modelIds.append(contentsOf: (1 ... 999).map { _ in UUID().uuidString })
98-
modelIds.append(mutationEvent2.modelId)
99-
MutationEvent.pendingMutationEvents(for: modelIds,
100-
storageAdapter: storageAdapter) { result in
86+
var mutationEvents = [mutationEvent1]
87+
mutationEvents.append(contentsOf: (1 ... 999).map { _ in generateRandomMutationEvent() })
88+
mutationEvents.append(mutationEvent2)
89+
MutationEvent.pendingMutationEvents(
90+
forMutationEvents: mutationEvents,
91+
storageAdapter: storageAdapter
92+
) { result in
10193
switch result {
10294
case .success(let mutationEvents):
10395
XCTAssertEqual(mutationEvents.count, 2)
@@ -108,4 +100,14 @@ class MutationEventQueryTests: BaseDataStoreTests {
108100

109101
wait(for: [querySuccess], timeout: 1)
110102
}
103+
104+
private func generateRandomMutationEvent() -> MutationEvent {
105+
MutationEvent(
106+
id: UUID().uuidString,
107+
modelId: UUID().uuidString,
108+
modelName: Post.modelName,
109+
json: "",
110+
mutationType: .create
111+
)
112+
}
111113
}

0 commit comments

Comments
 (0)