Skip to content

Commit 7873d56

Browse files
author
Di Wu
authored
fix(datastore): mutation event got ignored while executing parallel saving (#2781)
* fix(datastore): mutation event got ignored while executing parallel saving * fix(datastore): refactor with extension peek method * test(core): add test cases for extension method peek on Optional * fix(core): rename optional peek method to ifSome
1 parent 7e3b4c3 commit 7873d56

File tree

4 files changed

+157
-8
lines changed

4 files changed

+157
-8
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
9+
import Foundation
10+
11+
extension Optional {
12+
///
13+
/// Performing side effect function when data is exist
14+
/// - parameters:
15+
/// - then: a closure that takes wrapped data as a parameter
16+
@_spi(OptionalExtension)
17+
public func ifSome(_ then: (Wrapped) throws -> Void) rethrows {
18+
if case .some(let wrapped) = self {
19+
try then(wrapped)
20+
}
21+
}
22+
}

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// SPDX-License-Identifier: Apache-2.0
66
//
77

8-
import Amplify
8+
@_spi(OptionalExtension) import Amplify
99
import Combine
1010
import Foundation
1111

@@ -215,24 +215,31 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
215215

216216
/// Saves the deconflicted mutationEvent, invokes `nextEventPromise` if it exists, and the save was successful,
217217
/// and finally invokes the completion promise from the future of the original invocation of `submit`
218-
func save(mutationEvent: MutationEvent,
219-
storageAdapter: StorageEngineAdapter,
220-
completionPromise: @escaping Future<MutationEvent, DataStoreError>.Promise) {
221-
218+
func save(
219+
mutationEvent: MutationEvent,
220+
storageAdapter: StorageEngineAdapter,
221+
completionPromise: @escaping Future<MutationEvent, DataStoreError>.Promise
222+
) {
222223
log.verbose("\(#function) mutationEvent: \(mutationEvent)")
224+
let nextEventPromise = self.nextEventPromise.getAndSet(nil)
223225
var eventToPersist = mutationEvent
224-
if nextEventPromise.get() != nil {
226+
if nextEventPromise != nil {
225227
eventToPersist.inProcess = true
226228
}
229+
227230
storageAdapter.save(eventToPersist, condition: nil, eagerLoad: true) { result in
228231
switch result {
229232
case .failure(let dataStoreError):
230233
self.log.verbose("\(#function): Error saving mutation event: \(dataStoreError)")
234+
// restore the `nextEventPromise` value when failed to save mutation event
235+
// as nextEventPromise is expecting to hanlde error of querying unprocessed mutaiton events
236+
// not the failure of saving mutaiton event operation
237+
nextEventPromise.ifSome(self.nextEventPromise.set(_:))
231238
case .success(let savedMutationEvent):
232239
self.log.verbose("\(#function): saved \(savedMutationEvent)")
233-
if let nextEventPromise = self.nextEventPromise.getAndSet(nil) {
240+
nextEventPromise.ifSome {
234241
self.log.verbose("\(#function): invoking nextEventPromise with \(savedMutationEvent)")
235-
nextEventPromise(.success(savedMutationEvent))
242+
$0(.success(savedMutationEvent))
236243
}
237244
}
238245
self.log.verbose("\(#function): invoking completionPromise with \(result)")

AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreEndToEndTests.swift

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,64 @@ class DataStoreEndToEndTests: SyncEngineIntegrationTestBase {
479479
try await Amplify.DataStore.start()
480480
}
481481

482+
///
483+
/// - Given: DataStore with clean state
484+
/// - When:
485+
/// - do some mutaions to ensure MutationEventPublisher works fine
486+
/// - wait 1 second for OutgoingMutationQueue stateMachine transform to waitingForEventToProcess state
487+
/// - do some mutations in parallel
488+
/// - Then: verify no mutaiton loss
489+
func testParallelMutations_whenWaitingForEventToProcess_noMutationLoss() async throws {
490+
await setUp(withModels: TestModelRegistration())
491+
try await startAmplifyAndWaitForSync()
492+
let parallelSize = 100
493+
let initExpectation = expectation(description: "expect MutationEventPublisher works fine")
494+
let parallelExpectation = expectation(description: "expect parallel processing no data loss")
495+
496+
let newPost = Post(title: UUID().uuidString, content: UUID().uuidString, createdAt: .now())
497+
498+
let titlePrefix = UUID().uuidString
499+
let posts = (0..<parallelSize).map { Post(title: "\(titlePrefix)-\($0)", content: UUID().uuidString, createdAt: .now()) }
500+
var expectedResult = Set<String>()
501+
let cancellable = Amplify.Hub.publisher(for: .dataStore)
502+
.sink { payload in
503+
let event = DataStoreHubEvent(payload: payload)
504+
switch event {
505+
case .outboxMutationProcessed(let mutationEvent):
506+
guard mutationEvent.modelName == Post.modelName,
507+
let post = mutationEvent.element.model as? Post
508+
else
509+
{ break }
510+
511+
if post.title == newPost.title {
512+
initExpectation.fulfill()
513+
}
514+
515+
if post.title.hasPrefix(titlePrefix) {
516+
expectedResult.insert(post.title)
517+
}
518+
519+
if expectedResult.count == parallelSize {
520+
parallelExpectation.fulfill()
521+
}
522+
default: break
523+
}
524+
}
525+
try await Amplify.DataStore.save(newPost)
526+
wait(for: [initExpectation], timeout: 5)
527+
try await Task.sleep(seconds: 1)
528+
529+
for post in posts {
530+
Task {
531+
try? await Amplify.DataStore.save(post)
532+
}
533+
}
534+
535+
wait(for: [parallelExpectation], timeout: Double(parallelSize))
536+
cancellable.cancel()
537+
XCTAssertEqual(expectedResult, Set(posts.map(\.title)))
538+
}
539+
482540
// MARK: - Helpers
483541

484542
func validateSavePost() async throws {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
@_spi(OptionalExtension) import Amplify
9+
import XCTest
10+
11+
class OptionalExtensionTests: XCTestCase {
12+
13+
/// - Given:
14+
/// Optional of integer with none
15+
/// - When:
16+
/// apply a function to increase a captured local integer variable with value 0
17+
/// - Then:
18+
/// - the ifSome function on optional should not be applied
19+
/// - local integer variable value stays 0
20+
func testIfSome_withNone_doNothing() {
21+
var sideEffect = 0
22+
let optional: Int? = .none
23+
optional.ifSome { _ in sideEffect += 1 }
24+
XCTAssertEqual(0, sideEffect)
25+
}
26+
27+
/// - Given:
28+
/// Optional of integer with value 10
29+
/// - When:
30+
/// apply a function to increase a captured local integer variable with value 0
31+
/// - Then:
32+
/// - the ifSome function on optioanl should be applied
33+
/// - capture local integer value equals 1
34+
func testIfSome_withValue_applyFunction() {
35+
var sideEffect = 0
36+
let optional: Int? = .some(10)
37+
optional.ifSome { _ in sideEffect += 1 }
38+
XCTAssertEqual(1, sideEffect)
39+
}
40+
41+
/// - Given:
42+
/// Optional of integer with value 10
43+
/// - When:
44+
/// apply a function that throw error
45+
/// - Then:
46+
/// - the ifSome function on optioanl should be applied
47+
/// - the error is rethrowed
48+
func testIfSome_withValue_applyFunctionRethrowError() {
49+
let optional: Int? = .some(10)
50+
let expectedError = TestRuntimeError()
51+
XCTAssertThrowsError(try optional.ifSome {_ in
52+
throw expectedError
53+
}) { error in
54+
XCTAssertEqual(expectedError, error as? TestRuntimeError)
55+
}
56+
}
57+
58+
}
59+
60+
fileprivate struct TestRuntimeError: Error, Equatable {
61+
let id = UUID()
62+
}

0 commit comments

Comments
 (0)