Skip to content

Commit d35e9a7

Browse files
authored
fix(datastore-v1): update pending mutation events version from mutation response (#3458)
1 parent 557fc42 commit d35e9a7

File tree

6 files changed

+558
-6
lines changed

6 files changed

+558
-6
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,13 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
262262
return
263263
}
264264
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
265-
completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
265+
MutationEvent.reconcilePendingMutationEventsVersion(
266+
sent: mutationEvent,
267+
received: mutationSync,
268+
storageAdapter: storageAdapter
269+
) { _ in
270+
self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
271+
}
266272
} else {
267273
completeProcessingEvent(mutationEvent)
268274
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,33 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
102102
}
103103
}
104104

105+
/// Always retrieve and use the largest version when available. The source of the version comes
106+
/// from either the MutationEvent itself, which represents the queue request, or the persisted version
107+
/// from the metadata table.
108+
///
109+
/// **Version in the Mutation Event**. If there are mulitple mutation events pending, each outgoing
110+
/// mutation processing will result in synchronously updating the pending mutation's version
111+
/// before enqueuing the mutation response for reconciliation.
112+
///
113+
/// **Version persisted in the metadata table**: Reconciliation will persist the latest version in the
114+
/// metadata table. In cases of quick consecutive updates, the MutationEvent's version could
115+
/// be greater than the persisted since the MutationEvent is updated from the original thread that
116+
/// processed the outgoing mutation.
117+
private func getLatestVersion(_ mutationEvent: MutationEvent) -> Int? {
118+
let latestSyncedMetadataVersion = getLatestSyncMetadata()?.version
119+
let mutationEventVersion = mutationEvent.version
120+
switch (latestSyncedMetadataVersion, mutationEventVersion) {
121+
case let (.some(syncedVersion), .some(version)):
122+
return max(syncedVersion, version)
123+
case let (.some(syncedVersion), .none):
124+
return syncedVersion
125+
case let (.none, .some(version)):
126+
return version
127+
case (.none, .none):
128+
return nil
129+
}
130+
}
131+
105132
/// Creates a GraphQLRequest based on given `mutationType`
106133
/// - Parameters:
107134
/// - mutationType: mutation type
@@ -112,7 +139,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
112139
authType: AWSAuthorizationType? = nil
113140
) -> GraphQLRequest<MutationSync<AnyModel>>? {
114141
var request: GraphQLRequest<MutationSync<AnyModel>>
115-
let latestSyncMetadata = getLatestSyncMetadata()
142+
let version = getLatestVersion(mutationEvent)
116143
do {
117144
var graphQLFilter: GraphQLFilter?
118145
if let graphQLFilterJSON = mutationEvent.graphQLFilterJSON {
@@ -131,7 +158,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
131158
request = GraphQLRequest<MutationSyncResult>.deleteMutation(of: model,
132159
modelSchema: modelSchema,
133160
where: graphQLFilter,
134-
version: latestSyncMetadata?.version)
161+
version: version)
135162
case .update:
136163
let model = try mutationEvent.decodeModel()
137164
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
@@ -143,7 +170,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
143170
request = GraphQLRequest<MutationSyncResult>.updateMutation(of: model,
144171
modelSchema: modelSchema,
145172
where: graphQLFilter,
146-
version: latestSyncMetadata?.version)
173+
version: version)
147174
case .create:
148175
let model = try mutationEvent.decodeModel()
149176
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
@@ -154,7 +181,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
154181
}
155182
request = GraphQLRequest<MutationSyncResult>.createMutation(of: model,
156183
modelSchema: modelSchema,
157-
version: latestSyncMetadata?.version)
184+
version: version)
158185
}
159186
} catch {
160187
let apiError = APIError.unknown("Couldn't decode model", "", error)
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import Dispatch
10+
import AWSPluginsCore
11+
12+
extension MutationEvent {
13+
// Consecutive operations that modify a model results in a sequence of pending mutation events that
14+
// have the current version of the model. The first mutation event has the correct version of the model,
15+
// while the subsequent events will have lower versions if the first mutation event is successfully synced
16+
// to the cloud. By reconciling the pending mutation events after syncing the first mutation event,
17+
// we attempt to update the pending version to the latest version from the response.
18+
// The before and after conditions for consecutive update scenarios are as below:
19+
// - Save, then immediately update
20+
// Queue Before - [(version: nil, inprocess: true, type: .create),
21+
// (version: nil, inprocess: false, type: .update)]
22+
// Response - [version: 1, type: .create]
23+
// Queue After - [(version: 1, inprocess: false, type: .update)]
24+
// - Save, then immediately delete
25+
// Queue Before - [(version: nil, inprocess: true, type: .create),
26+
// (version: nil, inprocess: false, type: .delete)]
27+
// Response - [version: 1, type: .create]
28+
// Queue After - [(version: 1, inprocess: false, type: .delete)]
29+
// - Save, sync, then immediately update and delete
30+
// Queue Before (After save, sync)
31+
// - [(version: 1, inprocess: true, type: .update), (version: 1, inprocess: false, type: .delete)]
32+
// Response - [version: 2, type: .update]
33+
// Queue After - [(version: 2, inprocess: false, type: .delete)]
34+
//
35+
// For a given model `id`, checks the version of the head of pending mutation event queue
36+
// against the API response version in `mutationSync` and saves it in the mutation event table if
37+
// the response version is a newer one
38+
static func reconcilePendingMutationEventsVersion(sent mutationEvent: MutationEvent,
39+
received mutationSync: MutationSync<AnyModel>,
40+
storageAdapter: StorageEngineAdapter,
41+
completion: @escaping DataStoreCallback<Void>) {
42+
MutationEvent.pendingMutationEvents(
43+
forMutationEvent: mutationEvent,
44+
storageAdapter: storageAdapter) { queryResult in
45+
switch queryResult {
46+
case .failure(let dataStoreError):
47+
completion(.failure(dataStoreError))
48+
case .success(let localMutationEvents):
49+
guard let existingEvent = localMutationEvents.first else {
50+
completion(.success(()))
51+
return
52+
}
53+
54+
guard let reconciledEvent = reconcile(pendingMutationEvent: existingEvent,
55+
with: mutationEvent,
56+
responseMutationSync: mutationSync) else {
57+
completion(.success(()))
58+
return
59+
}
60+
61+
storageAdapter.save(reconciledEvent, condition: nil) { result in
62+
switch result {
63+
case .failure(let dataStoreError):
64+
completion(.failure(dataStoreError))
65+
case .success:
66+
completion(.success(()))
67+
}
68+
}
69+
}
70+
}
71+
}
72+
73+
static func reconcile(pendingMutationEvent: MutationEvent,
74+
with requestMutationEvent: MutationEvent,
75+
responseMutationSync: MutationSync<AnyModel>) -> MutationEvent? {
76+
// return if version of the pending mutation event is not nil and
77+
// is >= version contained in the response
78+
if pendingMutationEvent.version != nil &&
79+
pendingMutationEvent.version! >= responseMutationSync.syncMetadata.version {
80+
return nil
81+
}
82+
83+
do {
84+
let responseModel = responseMutationSync.model.instance
85+
let requestModel = try requestMutationEvent.decodeModel()
86+
87+
// check if the data sent in the request is the same as the response
88+
// if it is, update the pending mutation event version to the response version
89+
guard let modelSchema = ModelRegistry.modelSchema(from: requestMutationEvent.modelName),
90+
modelSchema.compare(responseModel, requestModel) else {
91+
return nil
92+
}
93+
94+
var pendingMutationEvent = pendingMutationEvent
95+
pendingMutationEvent.version = responseMutationSync.syncMetadata.version
96+
return pendingMutationEvent
97+
} catch {
98+
Amplify.log.verbose("Error decoding models: \(error)")
99+
return nil
100+
}
101+
}
102+
103+
}

0 commit comments

Comments
 (0)