Skip to content

Commit 430d53f

Browse files
author
Di Wu
authored
fix(datastore-v1): mutation event got ignored while executing parallel saving (#2782)
* fix(datastore-v1): mutation event got ignored while executing parallel saving * refactor: add ifSome operator for optaional
1 parent f659187 commit 430d53f

File tree

6 files changed

+173
-12
lines changed

6 files changed

+173
-12
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
9+
import Foundation
10+
11+
extension Optional {
12+
///
13+
/// Performing side effect function when data is exist
14+
/// - parameters:
15+
/// - then: a closure that takes wrapped data as a parameter
16+
@_spi(OptionalExtension)
17+
public func ifSome(_ then: (Wrapped) throws -> Void) rethrows {
18+
if case .some(let wrapped) = self {
19+
try then(wrapped)
20+
}
21+
}
22+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// SPDX-License-Identifier: Apache-2.0
66
//
77

8-
import Amplify
8+
@_spi(OptionalExtension) import Amplify
99
import Combine
1010
import Foundation
1111

@@ -208,24 +208,31 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
208208

209209
/// Saves the deconflicted mutationEvent, invokes `nextEventPromise` if it exists, and the save was successful,
210210
/// and finally invokes the completion promise from the future of the original invocation of `submit`
211-
func save(mutationEvent: MutationEvent,
212-
storageAdapter: StorageEngineAdapter,
213-
completionPromise: @escaping Future<MutationEvent, DataStoreError>.Promise) {
214-
211+
func save(
212+
mutationEvent: MutationEvent,
213+
storageAdapter: StorageEngineAdapter,
214+
completionPromise: @escaping Future<MutationEvent, DataStoreError>.Promise
215+
) {
215216
log.verbose("\(#function) mutationEvent: \(mutationEvent)")
217+
let nextEventPromise = self.nextEventPromise.getAndSet(nil)
216218
var eventToPersist = mutationEvent
217-
if nextEventPromise.get() != nil {
219+
if nextEventPromise != nil {
218220
eventToPersist.inProcess = true
219221
}
222+
220223
storageAdapter.save(eventToPersist, condition: nil) { result in
221224
switch result {
222225
case .failure(let dataStoreError):
223226
self.log.verbose("\(#function): Error saving mutation event: \(dataStoreError)")
227+
// restore the `nextEventPromise` value when failed to save mutation event
228+
// as nextEventPromise is expecting to hanlde error of querying unprocessed mutaiton events
229+
// not the failure of saving mutaiton event operation
230+
nextEventPromise.ifSome(self.nextEventPromise.set(_:))
224231
case .success(let savedMutationEvent):
225232
self.log.verbose("\(#function): saved \(savedMutationEvent)")
226-
if let nextEventPromise = self.nextEventPromise.getAndSet(nil) {
233+
nextEventPromise.ifSome {
227234
self.log.verbose("\(#function): invoking nextEventPromise with \(savedMutationEvent)")
228-
nextEventPromise(.success(savedMutationEvent))
235+
$0(.success(savedMutationEvent))
229236
}
230237
}
231238
self.log.verbose("\(#function): invoking completionPromise with \(result)")

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreEndToEndTests.swift

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,63 @@ class DataStoreEndToEndTests: SyncEngineIntegrationTestBase {
679679
sink.cancel()
680680
}
681681

682+
///
683+
/// - Given: DataStore with clean state
684+
/// - When:
685+
/// - do some mutaions to ensure MutationEventPublisher works fine
686+
/// - wait 1 second for OutgoingMutationQueue stateMachine transform to waitingForEventToProcess state
687+
/// - do some mutations in parallel
688+
/// - Then: verify no mutaiton loss
689+
func testParallelMutations_whenWaitingForEventToProcess_noMutationLoss() throws {
690+
setUp(withModels: TestModelRegistration())
691+
try startAmplifyAndWaitForReady()
692+
let parallelSize = 100
693+
let initExpectation = expectation(description: "expect MutationEventPublisher works fine")
694+
let parallelExpectation = expectation(description: "expect parallel processing no data loss")
695+
696+
let newPost = Post(title: UUID().uuidString, content: UUID().uuidString, createdAt: .now())
697+
698+
let titlePrefix = UUID().uuidString
699+
let posts = (0 ..< parallelSize).map {
700+
Post(title: "\(titlePrefix)-\($0)", content: UUID().uuidString, createdAt: .now())
701+
}
702+
var expectedResult = Set<String>()
703+
let cancellable = Amplify.Hub.publisher(for: .dataStore)
704+
.sink { payload in
705+
let event = DataStoreHubEvent(payload: payload)
706+
switch event {
707+
case .outboxMutationProcessed(let mutationEvent):
708+
guard mutationEvent.modelName == Post.modelName,
709+
let post = mutationEvent.element.model as? Post
710+
else { break }
711+
712+
if post.title == newPost.title {
713+
initExpectation.fulfill()
714+
}
715+
716+
if post.title.hasPrefix(titlePrefix) {
717+
expectedResult.insert(post.title)
718+
}
719+
720+
if expectedResult.count == parallelSize {
721+
parallelExpectation.fulfill()
722+
}
723+
default: break
724+
}
725+
}
726+
_ = Amplify.DataStore.save(newPost)
727+
wait(for: [initExpectation], timeout: 5)
728+
wait(for: 1)
729+
730+
for post in posts {
731+
_ = Amplify.DataStore.save(post)
732+
}
733+
734+
wait(for: [parallelExpectation], timeout: Double(parallelSize))
735+
cancellable.cancel()
736+
XCTAssertEqual(expectedResult, Set(posts.map(\.title)))
737+
}
738+
682739
// MARK: - Helpers
683740

684741
func validateSavePost() throws {

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/TestSupport/SyncEngineIntegrationTestBase.swift

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,16 @@ class SyncEngineIntegrationTestBase: DataStoreTestBase {
135135
}
136136

137137
}
138+
139+
extension XCTestCase {
140+
141+
func wait(for seconds: TimeInterval) {
142+
let waitExpectation = expectation(description: "Waiting")
143+
144+
let when = DispatchTime.now() + seconds
145+
DispatchQueue.main.asyncAfter(deadline: when) {
146+
waitExpectation.fulfill()
147+
}
148+
wait(for: [waitExpectation], timeout: seconds + 0.5)
149+
}
150+
}

AmplifyPlugins/DataStore/Podfile.lock

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,16 @@ CHECKOUT OPTIONS:
105105
:tag: 2.1.0
106106

107107
SPEC CHECKSUMS:
108-
Amplify: d0c5ec982b3448d158980577fe73a3aa6d27d984
109-
AmplifyPlugins: 20fce06296f5946bdc2d3aab93f3b81f813abb03
110-
AmplifyTestCommon: c28a34e5f412f4d2acbb9f7b250b603fc8d9f4be
108+
Amplify: c1341383bbaf4a2576802eaee7cb5a7ec15d7795
109+
AmplifyPlugins: 807bdcbdad2eb83d742e5df413eacbd8a3cfad37
110+
AmplifyTestCommon: b8097d1abdb02919414245d19a2f5c4382dac6af
111111
AppSyncRealTimeClient: ec19a24f635611b193eb98a2da573abcf98b793b
112112
AWSAuthCore: 88e77e867b210e5d09e35a484de19753d587aee3
113113
AWSCognitoIdentityProvider: 37ff510e8f64dc6a1240088ba92ad4d6f0cd841e
114114
AWSCognitoIdentityProviderASF: f2cd19990c4ae642ad73d09a4945018a994c9ff8
115115
AWSCore: 493e49f8118e04fa57d927ceb117ba24a9b5ca02
116116
AWSMobileClient: 36d9bb90118da3ba14a87f9999818cb314953c5c
117-
AWSPluginsCore: a3033b4838dd83e62efc3c7eee660e897485441c
117+
AWSPluginsCore: 50d53c4ac99f1d97f893cff748c32ac11668a7eb
118118
CwlCatchException: 86760545af2a490a23e964d76d7c77442dbce79b
119119
CwlCatchExceptionSupport: a004322095d7101b945442c86adc7cec0650f676
120120
CwlMachBadInstructionHandler: aa1fe9f2d08b29507c150d099434b2890247e7f8
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
@_spi(OptionalExtension) import Amplify
9+
import XCTest
10+
11+
class OptionalExtensionTests: XCTestCase {
12+
13+
/// - Given:
14+
/// Optional of integer with none
15+
/// - When:
16+
/// apply a function to increase a captured local integer variable with value 0
17+
/// - Then:
18+
/// - the ifSome function on optional should not be applied
19+
/// - local integer variable value stays 0
20+
func testIfSome_withNone_doNothing() {
21+
var sideEffect = 0
22+
let optional: Int? = .none
23+
optional.ifSome { _ in sideEffect += 1 }
24+
XCTAssertEqual(0, sideEffect)
25+
}
26+
27+
/// - Given:
28+
/// Optional of integer with value 10
29+
/// - When:
30+
/// apply a function to increase a captured local integer variable with value 0
31+
/// - Then:
32+
/// - the ifSome function on optioanl should be applied
33+
/// - capture local integer value equals 1
34+
func testIfSome_withValue_applyFunction() {
35+
var sideEffect = 0
36+
let optional: Int? = .some(10)
37+
optional.ifSome { _ in sideEffect += 1 }
38+
XCTAssertEqual(1, sideEffect)
39+
}
40+
41+
/// - Given:
42+
/// Optional of integer with value 10
43+
/// - When:
44+
/// apply a function that throw error
45+
/// - Then:
46+
/// - the ifSome function on optioanl should be applied
47+
/// - the error is rethrowed
48+
func testIfSome_withValue_applyFunctionRethrowError() {
49+
let optional: Int? = .some(10)
50+
let expectedError = TestRuntimeError()
51+
XCTAssertThrowsError(try optional.ifSome {_ in
52+
throw expectedError
53+
}) { error in
54+
XCTAssertEqual(expectedError, error as? TestRuntimeError)
55+
}
56+
}
57+
58+
}
59+
60+
fileprivate struct TestRuntimeError: Error, Equatable {
61+
let id = UUID()
62+
}

0 commit comments

Comments
 (0)