Skip to content

Commit 2abaf4c

Browse files
authored
fix(datastore): update pending mutation events version from mutation response (#3452)
* fix(datastore): update pending mutation events version from mutation response * resolve comments
1 parent 3b2ceba commit 2abaf4c

File tree

4 files changed

+544
-9
lines changed

4 files changed

+544
-9
lines changed

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,16 +249,22 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
249249
guard let reconciliationQueue = reconciliationQueue else {
250250
let dataStoreError = DataStoreError.configuration(
251251
"reconciliationQueue is unexpectedly nil",
252-
"""
253-
The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
254-
\(AmplifyErrorMessages.reportBugToAWS())
255-
"""
252+
"""
253+
The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
254+
\(AmplifyErrorMessages.reportBugToAWS())
255+
"""
256256
)
257257
stateMachine.notify(action: .errored(dataStoreError))
258258
return
259259
}
260260
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
261-
completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
261+
MutationEvent.reconcilePendingMutationEventsVersion(
262+
sent: mutationEvent,
263+
received: mutationSync,
264+
storageAdapter: storageAdapter
265+
) { _ in
266+
self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
267+
}
262268
} else {
263269
completeProcessingEvent(mutationEvent)
264270
}

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,33 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
103103
}
104104
}
105105

106+
/// Always retrieve and use the largest version when available. The source of the version comes
107+
/// from either the MutationEvent itself, which represents the queue request, or the persisted version
108+
/// from the metadata table.
109+
///
110+
/// **Version in the Mutation Event**. If there are mulitple mutation events pending, each outgoing
111+
/// mutation processing will result in synchronously updating the pending mutation's version
112+
/// before enqueuing the mutation response for reconciliation.
113+
///
114+
/// **Version persisted in the metadata table**: Reconciliation will persist the latest version in the
115+
/// metadata table. In cases of quick consecutive updates, the MutationEvent's version could
116+
/// be greater than the persisted since the MutationEvent is updated from the original thread that
117+
/// processed the outgoing mutation.
118+
private func getLatestVersion(_ mutationEvent: MutationEvent) -> Int? {
119+
let latestSyncedMetadataVersion = getLatestSyncMetadata()?.version
120+
let mutationEventVersion = mutationEvent.version
121+
switch (latestSyncedMetadataVersion, mutationEventVersion) {
122+
case let (.some(syncedVersion), .some(version)):
123+
return max(syncedVersion, version)
124+
case let (.some(syncedVersion), .none):
125+
return syncedVersion
126+
case let (.none, .some(version)):
127+
return version
128+
case (.none, .none):
129+
return nil
130+
}
131+
}
132+
106133
/// Creates a GraphQLRequest based on given `mutationType`
107134
/// - Parameters:
108135
/// - mutationType: mutation type
@@ -112,7 +139,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
112139
mutationType: GraphQLMutationType,
113140
authType: AWSAuthorizationType? = nil
114141
) -> GraphQLRequest<MutationSync<AnyModel>>? {
115-
let latestSyncMetadata = getLatestSyncMetadata()
142+
let version = getLatestVersion(mutationEvent)
116143
var request: GraphQLRequest<MutationSync<AnyModel>>
117144

118145
do {
@@ -133,7 +160,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
133160
request = GraphQLRequest<MutationSyncResult>.deleteMutation(of: model,
134161
modelSchema: modelSchema,
135162
where: graphQLFilter,
136-
version: latestSyncMetadata?.version)
163+
version: version)
137164
case .update:
138165
let model = try mutationEvent.decodeModel()
139166
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
@@ -145,7 +172,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
145172
request = GraphQLRequest<MutationSyncResult>.updateMutation(of: model,
146173
modelSchema: modelSchema,
147174
where: graphQLFilter,
148-
version: latestSyncMetadata?.version)
175+
version: version)
149176
case .create:
150177
let model = try mutationEvent.decodeModel()
151178
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
@@ -156,7 +183,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
156183
}
157184
request = GraphQLRequest<MutationSyncResult>.createMutation(of: model,
158185
modelSchema: modelSchema,
159-
version: latestSyncMetadata?.version)
186+
version: version)
160187
}
161188
} catch {
162189
let apiError = APIError.unknown("Couldn't decode model", "", error)
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import Dispatch
10+
import AWSPluginsCore
11+
12+
extension MutationEvent {
13+
// Consecutive operations that modify a model results in a sequence of pending mutation events that
14+
// have the current version of the model. The first mutation event has the correct version of the model,
15+
// while the subsequent events will have lower versions if the first mutation event is successfully synced
16+
// to the cloud. By reconciling the pending mutation events after syncing the first mutation event,
17+
// we attempt to update the pending version to the latest version from the response.
18+
// The before and after conditions for consecutive update scenarios are as below:
19+
// - Save, then immediately update
20+
// Queue Before - [(version: nil, inprocess: true, type: .create),
21+
// (version: nil, inprocess: false, type: .update)]
22+
// Response - [version: 1, type: .create]
23+
// Queue After - [(version: 1, inprocess: false, type: .update)]
24+
// - Save, then immediately delete
25+
// Queue Before - [(version: nil, inprocess: true, type: .create),
26+
// (version: nil, inprocess: false, type: .delete)]
27+
// Response - [version: 1, type: .create]
28+
// Queue After - [(version: 1, inprocess: false, type: .delete)]
29+
// - Save, sync, then immediately update and delete
30+
// Queue Before (After save, sync)
31+
// - [(version: 1, inprocess: true, type: .update), (version: 1, inprocess: false, type: .delete)]
32+
// Response - [version: 2, type: .update]
33+
// Queue After - [(version: 2, inprocess: false, type: .delete)]
34+
//
35+
// For a given model `id`, checks the version of the head of pending mutation event queue
36+
// against the API response version in `mutationSync` and saves it in the mutation event table if
37+
// the response version is a newer one
38+
static func reconcilePendingMutationEventsVersion(sent mutationEvent: MutationEvent,
39+
received mutationSync: MutationSync<AnyModel>,
40+
storageAdapter: StorageEngineAdapter,
41+
completion: @escaping DataStoreCallback<Void>) {
42+
MutationEvent.pendingMutationEvents(
43+
forMutationEvent: mutationEvent,
44+
storageAdapter: storageAdapter
45+
) { queryResult in
46+
switch queryResult {
47+
case .failure(let dataStoreError):
48+
completion(.failure(dataStoreError))
49+
case .success(let localMutationEvents):
50+
guard let existingEvent = localMutationEvents.first else {
51+
completion(.success(()))
52+
return
53+
}
54+
55+
guard let reconciledEvent = reconcile(pendingMutationEvent: existingEvent,
56+
with: mutationEvent,
57+
responseMutationSync: mutationSync) else {
58+
completion(.success(()))
59+
return
60+
}
61+
62+
storageAdapter.save(reconciledEvent, condition: nil, eagerLoad: true) { result in
63+
switch result {
64+
case .failure(let dataStoreError):
65+
completion(.failure(dataStoreError))
66+
case .success:
67+
completion(.success(()))
68+
}
69+
}
70+
}
71+
}
72+
}
73+
74+
static func reconcile(pendingMutationEvent: MutationEvent,
75+
with requestMutationEvent: MutationEvent,
76+
responseMutationSync: MutationSync<AnyModel>) -> MutationEvent? {
77+
// return if version of the pending mutation event is not nil and
78+
// is >= version contained in the response
79+
if pendingMutationEvent.version != nil &&
80+
pendingMutationEvent.version! >= responseMutationSync.syncMetadata.version {
81+
return nil
82+
}
83+
84+
do {
85+
let responseModel = responseMutationSync.model.instance
86+
let requestModel = try requestMutationEvent.decodeModel()
87+
88+
// check if the data sent in the request is the same as the response
89+
// if it is, update the pending mutation event version to the response version
90+
guard let modelSchema = ModelRegistry.modelSchema(from: requestMutationEvent.modelName),
91+
modelSchema.compare(responseModel, requestModel) else {
92+
return nil
93+
}
94+
95+
var pendingMutationEvent = pendingMutationEvent
96+
pendingMutationEvent.version = responseMutationSync.syncMetadata.version
97+
return pendingMutationEvent
98+
} catch {
99+
Amplify.log.verbose("Error decoding models: \(error)")
100+
return nil
101+
}
102+
}
103+
104+
}

0 commit comments

Comments
 (0)