Skip to content

Commit f1d354c

Browse files
authored
fix(DataStore): improve MutationEvent resiliency to interruptions (#3492)
* fix(DataStore): save and syncMutation under transaction * fix(DataStore): MutationEvent dequeue include inProcess true events * fix(DataStore): throw on missing syncEngine to rollback save * AWSMutationDatabaseAdapter.getNextMutationEvent doc comment and tests * unit tests for save transaction * Add integration test * address PR comments * fix test
1 parent ccbce93 commit f1d354c

File tree

10 files changed

+589
-75
lines changed

10 files changed

+589
-75
lines changed

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/SQLite/StorageEngineAdapter+SQLite.swift

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,18 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
148148
condition: QueryPredicate? = nil,
149149
eagerLoad: Bool = true,
150150
completion: DataStoreCallback<M>) {
151+
completion(save(model,
152+
modelSchema: modelSchema,
153+
condition: condition,
154+
eagerLoad: eagerLoad))
155+
}
156+
157+
func save<M: Model>(_ model: M,
158+
modelSchema: ModelSchema,
159+
condition: QueryPredicate? = nil,
160+
eagerLoad: Bool = true) -> DataStoreResult<M> {
151161
guard let connection = connection else {
152-
completion(.failure(DataStoreError.nilSQLiteConnection()))
153-
return
162+
return .failure(DataStoreError.nilSQLiteConnection())
154163
}
155164
do {
156165
let modelType = type(of: model)
@@ -162,8 +171,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
162171
let dataStoreError = DataStoreError.invalidCondition(
163172
"Cannot apply a condition on model which does not exist.",
164173
"Save the model instance without a condition first.")
165-
completion(.failure(causedBy: dataStoreError))
166-
return
174+
return .failure(causedBy: dataStoreError)
167175
}
168176

169177
let statement = InsertStatement(model: model, modelSchema: modelSchema)
@@ -179,9 +187,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
179187
let dataStoreError = DataStoreError.invalidCondition(
180188
"Save failed due to condition did not match existing model instance.",
181189
"The save will continue to fail until the model instance is updated.")
182-
completion(.failure(causedBy: dataStoreError))
183-
184-
return
190+
return .failure(causedBy: dataStoreError)
185191
}
186192
}
187193

@@ -192,23 +198,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
192198
}
193199

194200
// load the recent saved instance and pass it back to the callback
195-
query(modelType, modelSchema: modelSchema,
196-
predicate: model.identifier(schema: modelSchema).predicate,
197-
eagerLoad: eagerLoad) {
198-
switch $0 {
199-
case .success(let result):
200-
if let saved = result.first {
201-
completion(.success(saved))
202-
} else {
203-
completion(.failure(.nonUniqueResult(model: modelType.modelName,
204-
count: result.count)))
205-
}
206-
case .failure(let error):
207-
completion(.failure(error))
201+
let queryResult = query(modelType, modelSchema: modelSchema,
202+
predicate: model.identifier(schema: modelSchema).predicate,
203+
eagerLoad: eagerLoad)
204+
switch queryResult {
205+
case .success(let result):
206+
if let saved = result.first {
207+
return .success(saved)
208+
} else {
209+
return .failure(.nonUniqueResult(model: modelType.modelName,
210+
count: result.count))
208211
}
212+
case .failure(let error):
213+
return .failure(error)
209214
}
210215
} catch {
211-
completion(.failure(causedBy: error))
216+
return .failure(causedBy: error)
212217
}
213218
}
214219

@@ -321,9 +326,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
321326
paginationInput: QueryPaginationInput? = nil,
322327
eagerLoad: Bool = true,
323328
completion: DataStoreCallback<[M]>) {
329+
completion(query(modelType,
330+
modelSchema: modelSchema,
331+
predicate: predicate,
332+
sort: sort,
333+
paginationInput: paginationInput,
334+
eagerLoad: eagerLoad))
335+
}
336+
337+
private func query<M: Model>(_ modelType: M.Type,
338+
modelSchema: ModelSchema,
339+
predicate: QueryPredicate? = nil,
340+
sort: [QuerySortDescriptor]? = nil,
341+
paginationInput: QueryPaginationInput? = nil,
342+
eagerLoad: Bool = true) -> DataStoreResult<[M]> {
324343
guard let connection = connection else {
325-
completion(.failure(DataStoreError.nilSQLiteConnection()))
326-
return
344+
return .failure(DataStoreError.nilSQLiteConnection())
327345
}
328346
do {
329347
let statement = SelectStatement(from: modelSchema,
@@ -336,9 +354,9 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
336354
withSchema: modelSchema,
337355
using: statement,
338356
eagerLoad: eagerLoad)
339-
completion(.success(result))
357+
return .success(result)
340358
} catch {
341-
completion(.failure(causedBy: error))
359+
return .failure(causedBy: error)
342360
}
343361
}
344362

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -208,31 +208,41 @@ final class StorageEngine: StorageEngineBehavior {
208208
completion(.failure(causedBy: dataStoreError))
209209
}
210210

211-
let wrappedCompletion: DataStoreCallback<M> = { result in
212-
guard modelSchema.isSyncable, let syncEngine = self.syncEngine else {
213-
completion(result)
214-
return
215-
}
216-
217-
guard case .success(let savedModel) = result else {
218-
completion(result)
219-
return
211+
do {
212+
try storageAdapter.transaction {
213+
let result = self.storageAdapter.save(model,
214+
modelSchema: modelSchema,
215+
condition: condition,
216+
eagerLoad: eagerLoad)
217+
guard modelSchema.isSyncable else {
218+
completion(result)
219+
return
220+
}
221+
222+
guard case .success(let savedModel) = result else {
223+
completion(result)
224+
return
225+
}
226+
227+
guard let syncEngine else {
228+
let message = "No SyncEngine available to sync mutation event, rollback save."
229+
self.log.verbose("\(#function) \(message) : \(savedModel)")
230+
throw DataStoreError.internalOperation(
231+
message,
232+
"`DataStore.save()` was interrupted. `DataStore.stop()` may have been called.",
233+
nil)
234+
}
235+
self.log.verbose("\(#function) syncing mutation for \(savedModel)")
236+
self.syncMutation(of: savedModel,
237+
modelSchema: modelSchema,
238+
mutationType: mutationType,
239+
predicate: condition,
240+
syncEngine: syncEngine,
241+
completion: completion)
220242
}
221-
222-
self.log.verbose("\(#function) syncing mutation for \(savedModel)")
223-
self.syncMutation(of: savedModel,
224-
modelSchema: modelSchema,
225-
mutationType: mutationType,
226-
predicate: condition,
227-
syncEngine: syncEngine,
228-
completion: completion)
243+
} catch {
244+
completion(.failure(causedBy: error))
229245
}
230-
231-
storageAdapter.save(model,
232-
modelSchema: modelSchema,
233-
condition: condition,
234-
eagerLoad: eagerLoad,
235-
completion: wrappedCompletion)
236246
}
237247

238248
func save<M: Model>(_ model: M,

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineAdapter.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ protocol StorageEngineAdapter: AnyObject, ModelStorageBehavior, ModelStorageErro
3434

3535
// MARK: - Synchronous APIs
3636

37+
func save<M: Model>(_ model: M,
38+
modelSchema: ModelSchema,
39+
condition: QueryPredicate?,
40+
eagerLoad: Bool) -> DataStoreResult<M>
41+
3742
func exists(_ modelSchema: ModelSchema,
3843
withIdentifier id: ModelIdentifierProtocol,
3944
predicate: QueryPredicate?) throws -> Bool

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventSource.swift

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,55 @@ import Amplify
99
import Combine
1010

1111
extension AWSMutationDatabaseAdapter: MutationEventSource {
12+
13+
/// DataStore implements a FIFO queue of MutationEvents by using the local database
14+
/// and querying for the earliest MutationEvent by its `createdAt` field.
15+
///
16+
/// **Note**: In a previous revision of this code, this query used to filter on `InProcess` == `false` MutationEvents.
17+
/// This was to skip over already in-flight mutation events and grab the next one. However, it was observed in highly
18+
/// concurrent calls to `DataStore.start()` /`stop()` / `save()` that it will interrupt the
19+
/// **OutgoingMutationQueue** of processing and deleting a **MutationEvent** . `DataStore.start()`,
20+
/// which starts the remote sync engine, should perform a step to move all `InProcess` **MutationEvents** back
21+
/// to false, however there's a timing issue that is difficult to pinpoint. **OutgoingMutationQueue**'s query manages
22+
/// to pick up the second MutationEvent in the queue and sends it off, while the first one that is marked as `inProcess`
23+
/// isn't being processed, likely that process was already cancelled. The query below was updated to always dequeue the
24+
/// first regardless of `InProcess` in the [PR #3492](https://github.com/aws-amplify/amplify-swift/pull/3492).
25+
/// By removing the filter, there is a small chance that the same event may be sent twice. Sending the event twice is idempotent
26+
/// and the second response will be `ConditionalCheckFailed`. The `InProcess` flag is still needed for the
27+
/// handling consecutive update scenarios.
28+
///
29+
/// - Parameter completion: The first MutationEvent in the FIFO queue.
1230
func getNextMutationEvent(completion: @escaping DataStoreCallback<MutationEvent>) {
1331
log.verbose(#function)
1432

1533
guard let storageAdapter = storageAdapter else {
1634
completion(.failure(DataStoreError.nilStorageAdapter()))
1735
return
1836
}
19-
20-
let fields = MutationEvent.keys
21-
let predicate = fields.inProcess == false || fields.inProcess == nil
2237
let sort = QuerySortDescriptor(fieldName: MutationEvent.keys.createdAt.stringValue, order: .ascending)
23-
storageAdapter.query(MutationEvent.self,
24-
predicate: predicate,
25-
sort: [sort],
26-
paginationInput: nil,
27-
eagerLoad: true) { result in
28-
switch result {
29-
case .failure(let dataStoreError):
30-
completion(.failure(dataStoreError))
31-
case .success(let mutationEvents):
32-
guard let notInProcessEvent = mutationEvents.first else {
33-
self.nextEventPromise.set(completion)
34-
return
35-
}
36-
self.markInProcess(mutationEvent: notInProcessEvent,
37-
storageAdapter: storageAdapter,
38-
completion: completion)
39-
}
38+
storageAdapter.query(
39+
MutationEvent.self,
40+
predicate: nil,
41+
sort: [sort],
42+
paginationInput: nil,
43+
eagerLoad: true) { result in
44+
switch result {
45+
case .failure(let dataStoreError):
46+
completion(.failure(dataStoreError))
47+
case .success(let mutationEvents):
48+
guard let mutationEvent = mutationEvents.first else {
49+
self.nextEventPromise.set(completion)
50+
return
51+
}
52+
if mutationEvent.inProcess {
53+
log.verbose("The head of the MutationEvent queue was already inProcess (most likely interrupted process): \(mutationEvent)")
54+
completion(.success(mutationEvent))
55+
} else {
56+
self.markInProcess(mutationEvent: mutationEvent,
57+
storageAdapter: storageAdapter,
58+
completion: completion)
59+
}
60+
}
4061

4162
}
4263
}

AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsPostComment4V2Tests.swift

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,48 @@ final class StorageEngineTestsPostComment4V2Tests: StorageEngineTestsBase, Share
5050
}
5151
}
5252

53+
func testSavePostAndSyncSuccess() async throws {
54+
let receivedMutationEvent = expectation(description: "Mutation Events submitted to sync engine")
55+
let expectedSuccess = expectation(description: "Simulated success on mutation event submitted to sync engine")
56+
let post = ParentPost4V2(
57+
id: "postId1",
58+
title: "title1")
59+
60+
syncEngine.setCallbackOnSubmit { submittedMutationEvent, completion in
61+
receivedMutationEvent.fulfill()
62+
if submittedMutationEvent.modelId == post.id {
63+
expectedSuccess.fulfill()
64+
completion(.success(submittedMutationEvent))
65+
} else {
66+
XCTFail("Unexpected submitted MutationEvent \(submittedMutationEvent)")
67+
completion(.failure(.internalOperation("mockError", "", nil)))
68+
}
69+
}
70+
try await saveAsync(post)
71+
await fulfillment(of: [receivedMutationEvent, expectedSuccess], timeout: 1)
72+
73+
}
74+
75+
/// A save should fail if the corresponding MutationEvent could not be submitted to the syncEngine.
76+
func testSavePostFailDueToSyncEngineMissing() async throws {
77+
storageEngine.syncEngine = nil
78+
do {
79+
try await saveAsync(
80+
ParentPost4V2(
81+
id: "postId1",
82+
title: "title1"))
83+
XCTFail("Expected to fail when sync engine is `nil`")
84+
} catch {
85+
guard let dataStoreError = error as? DataStoreError else {
86+
XCTFail("Unexpected type of error \(error)")
87+
return
88+
}
89+
XCTAssertEqual(
90+
dataStoreError.errorDescription,
91+
"No SyncEngine available to sync mutation event, rollback save.")
92+
}
93+
}
94+
5395
func testSaveCommentThenQueryComment() async throws {
5496
let comment = ChildComment4V2(content: "content")
5597
let savedComment = try await saveAsync(comment)

0 commit comments

Comments
 (0)