Skip to content

Commit 8cc465b

Browse files
authored
fix(DataStore): Optimize mutation event propagation after model synced in ModelSyncEventEmitter (#1479)
* fix(DataStore): Optimize mutation event propagation after model synced in ModelSyncEventEmitter * address PR comments * address PR comments
1 parent e9e9c5d commit 8cc465b

File tree

4 files changed

+118
-22
lines changed

4 files changed

+118
-22
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Subscribe/DataStoreObserveQueryOperation.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
145145
let remoteSyncEngine = storageAdapter.syncEngine as? RemoteSyncEngine,
146146
let modelSyncedEventEmitter = remoteSyncEngine
147147
.syncEventEmitter?.modelSyncedEventEmitters[modelType.modelName] {
148-
return modelSyncedEventEmitter.dispatchedModelSyncedEvent.get()
148+
return modelSyncedEventEmitter.dispatchedModelSyncedEvent
149149
}
150150
return false
151151
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/ModelSyncedEventEmitter.swift

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ enum IncomingModelSyncedEmitterEvent {
2828
final class ModelSyncedEventEmitter {
2929
private let queue = DispatchQueue(label: "com.amazonaws.ModelSyncedEventEmitterQueue",
3030
target: DispatchQueue.global())
31+
private let dispatchedModelSyncedEventLock = NSLock()
3132

3233
private var syncOrchestratorSink: AnyCancellable?
3334
private var reconciliationQueueSink: AnyCancellable?
@@ -48,7 +49,27 @@ final class ModelSyncedEventEmitter {
4849
initialSyncOperationFinished && reconciledReceived == recordsReceived
4950
}
5051

51-
var dispatchedModelSyncedEvent: AtomicValue<Bool>
52+
/// Used internally within ModelSyncedEventEmitter instances, not thread-safe, is accessed serially under the
53+
/// DispatchQueue. Exists to avoid using `dispatchedModelSyncedEvent` which requires acquiring a lock each time.
54+
private var _dispatchedModelSyncedEvent: Bool
55+
56+
/// Used by other internal classes for checking state in a thread-safe way.
57+
var dispatchedModelSyncedEvent: Bool {
58+
get {
59+
dispatchedModelSyncedEventLock.lock()
60+
defer {
61+
dispatchedModelSyncedEventLock.unlock()
62+
}
63+
return _dispatchedModelSyncedEvent
64+
}
65+
set {
66+
dispatchedModelSyncedEventLock.lock()
67+
defer {
68+
dispatchedModelSyncedEventLock.unlock()
69+
}
70+
_dispatchedModelSyncedEvent = newValue
71+
}
72+
}
5273

5374
init(modelSchema: ModelSchema,
5475
initialSyncOrchestrator: InitialSyncOrchestrator?,
@@ -57,7 +78,7 @@ final class ModelSyncedEventEmitter {
5778
self.recordsReceived = 0
5879
self.reconciledReceived = 0
5980
self.initialSyncOperationFinished = false
60-
self.dispatchedModelSyncedEvent = AtomicValue(initialValue: false)
81+
self._dispatchedModelSyncedEvent = false
6182
self.modelSyncedEventBuilder = ModelSyncedEvent.Builder()
6283

6384
self.modelSyncedEventTopic = PassthroughSubject<IncomingModelSyncedEmitterEvent, Never>()
@@ -122,7 +143,7 @@ final class ModelSyncedEventEmitter {
122143
}
123144

124145
private func onReceiveReconciliationEvent(value: IncomingEventReconciliationQueueEvent) {
125-
guard !dispatchedModelSyncedEvent.get() else {
146+
guard !_dispatchedModelSyncedEvent else {
126147
switch value {
127148
case .mutationEventApplied(let event):
128149
modelSyncedEventTopic.send(.mutationEventApplied(event))
@@ -171,7 +192,7 @@ final class ModelSyncedEventEmitter {
171192
let modelSyncedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
172193
data: modelSyncedEventBuilder.build())
173194
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
174-
dispatchedModelSyncedEvent.set(true)
195+
dispatchedModelSyncedEvent = true
175196
modelSyncedEventTopic.send(.modelSyncedEvent)
176197
syncOrchestratorSink?.cancel()
177198
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/InitialSync/ModelSyncedEventEmitterTests.swift

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,79 @@ class ModelSyncedEventEmitterTests: XCTestCase {
115115
emitterSink?.cancel()
116116
listener.cancel()
117117
}
118+
119+
/// Send randomly 80% of the events from `initialSyncOrchestrator`, and 20% of the events from
120+
/// `reconciliationQueue`. This replicates the scenario of concurrent events received by the emitter. Then finished
121+
/// event from `initialSyncOrchestrator`. Then concurrent send all events from `reconciliationQueue`, one of these
122+
/// events will resolve to dispatching the modelSyncedEvent, followed by the remaining events being sent back from
123+
/// the emitter's topic. This exercises that concurrent events do not cause a data race in the emitter.
124+
///
125+
/// - Given: Emitter is seeded with events from both the `initialSyncOrchestrator` and `reconcilliationQueue`,
126+
/// followed by a finished event from initialSyncOrchestrator
127+
/// - When:
128+
/// - Concurrently receive more events from the `reconcilliationQueue`
129+
/// - Then:
130+
/// - `modelSyncedEvent` received and emitter continues to send mutation events
131+
///
132+
func testConcurrent() throws {
133+
let modelSyncedReceived = expectation(description: "modelSynced received")
134+
let mutationEventAppliedReceived = expectation(description: "mutationEventApplied received")
135+
mutationEventAppliedReceived.assertForOverFulfill = false
136+
let mutationEventDroppedReceived = expectation(description: "mutationEventDropped received")
137+
mutationEventDroppedReceived.assertForOverFulfill = false
138+
let anyPostMetadata = MutationSyncMetadata(id: "1",
139+
deleted: false,
140+
lastChangedAt: Int(Date().timeIntervalSince1970),
141+
version: 1)
142+
let testPost = Post(id: "1", title: "post1", content: "content", createdAt: .now())
143+
let anyPost = AnyModel(testPost)
144+
let anyPostMutationSync = MutationSync<AnyModel>(model: anyPost, syncMetadata: anyPostMetadata)
145+
let postMutationEvent = try MutationEvent(untypedModel: testPost, mutationType: .create)
146+
147+
let emitter = ModelSyncedEventEmitter(modelSchema: Post.schema,
148+
initialSyncOrchestrator: initialSyncOrchestrator,
149+
reconciliationQueue: reconciliationQueue)
150+
151+
initialSyncOrchestrator?.initialSyncOrchestratorTopic.send(.started(modelName: Post.modelName,
152+
syncType: .fullSync))
153+
DispatchQueue.concurrentPerform(iterations: 1_000) { _ in
154+
let index = Int.random(in: 1 ... 10)
155+
if index == 1 {
156+
reconciliationQueue?.incomingEventSubject.send(.mutationEventApplied(postMutationEvent))
157+
} else if index == 2 {
158+
reconciliationQueue?.incomingEventSubject.send(.mutationEventDropped(modelName: Post.modelName))
159+
} else {
160+
initialSyncOrchestrator?.initialSyncOrchestratorTopic.send(.enqueued(anyPostMutationSync,
161+
modelName: Post.modelName))
162+
}
163+
164+
}
165+
initialSyncOrchestrator?.initialSyncOrchestratorTopic.send(.finished(modelName: Post.modelName))
166+
167+
var emitterSink: AnyCancellable?
168+
emitterSink = emitter.publisher.sink { _ in
169+
XCTFail("Should not have completed")
170+
} receiveValue: { value in
171+
switch value {
172+
case .modelSyncedEvent:
173+
modelSyncedReceived.fulfill()
174+
case .mutationEventApplied:
175+
mutationEventAppliedReceived.fulfill()
176+
case .mutationEventDropped:
177+
mutationEventDroppedReceived.fulfill()
178+
}
179+
}
180+
181+
DispatchQueue.concurrentPerform(iterations: 3_000) { _ in
182+
let index = Int.random(in: 1 ... 2)
183+
if index == 1 {
184+
reconciliationQueue?.incomingEventSubject.send(.mutationEventApplied(postMutationEvent))
185+
} else if index == 2 {
186+
reconciliationQueue?.incomingEventSubject.send(.mutationEventDropped(modelName: Post.modelName))
187+
}
188+
}
189+
190+
waitForExpectations(timeout: 10)
191+
emitterSink?.cancel()
192+
}
118193
}

AmplifyPlugins/DataStore/Podfile.lock

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ PODS:
2323
- AWSPluginsCore (= 1.15.2)
2424
- AppSyncRealTimeClient (1.5.0):
2525
- Starscream (~> 3.1.1)
26-
- AWSAuthCore (2.26.1):
27-
- AWSCore (= 2.26.1)
28-
- AWSCognitoIdentityProvider (2.26.1):
29-
- AWSCognitoIdentityProviderASF (= 2.26.1)
30-
- AWSCore (= 2.26.1)
31-
- AWSCognitoIdentityProviderASF (2.26.1)
32-
- AWSCore (2.26.1)
33-
- AWSMobileClient (2.26.1):
34-
- AWSAuthCore (= 2.26.1)
35-
- AWSCognitoIdentityProvider (= 2.26.1)
36-
- AWSCognitoIdentityProviderASF (= 2.26.1)
37-
- AWSCore (= 2.26.1)
26+
- AWSAuthCore (2.26.2):
27+
- AWSCore (= 2.26.2)
28+
- AWSCognitoIdentityProvider (2.26.2):
29+
- AWSCognitoIdentityProviderASF (= 2.26.2)
30+
- AWSCore (= 2.26.2)
31+
- AWSCognitoIdentityProviderASF (2.26.2)
32+
- AWSCore (2.26.2)
33+
- AWSMobileClient (2.26.2):
34+
- AWSAuthCore (= 2.26.2)
35+
- AWSCognitoIdentityProvider (= 2.26.2)
36+
- AWSCognitoIdentityProviderASF (= 2.26.2)
37+
- AWSCore (= 2.26.2)
3838
- AWSPluginsCore (1.15.2):
3939
- Amplify (= 1.15.2)
4040
- AWSCore (~> 2.26.1)
@@ -103,11 +103,11 @@ SPEC CHECKSUMS:
103103
AmplifyPlugins: bf1558a4efb6ed94b80eff7a20fda020322409e3
104104
AmplifyTestCommon: 283bd8bf694569b1dda7f40f40a8ad1f97784eeb
105105
AppSyncRealTimeClient: 2b4482b1770a3e5cf64f9714a6d198550017b5a2
106-
AWSAuthCore: 264faaa6af5990af2c77effe8f398a4b80e6c44e
107-
AWSCognitoIdentityProvider: 5df455775b57eb4e61862acd88d93e56113950c4
108-
AWSCognitoIdentityProviderASF: b2c180f69537d57ff485f7314fd266032ca3f898
109-
AWSCore: 0f855e20ccb13e028932b737f0706023a9561399
110-
AWSMobileClient: ba255030a481b8a123b21585548e761456efc64b
106+
AWSAuthCore: c785a33af10a45aab6456a5fa7150f759dad836f
107+
AWSCognitoIdentityProvider: 0bea89a822d7dd76c7dc62212561a1074dee0031
108+
AWSCognitoIdentityProviderASF: 425d7f40fb6a520f8437a5f8b991215239ec620b
109+
AWSCore: dd7f74ab3f354aad435f83afdbfac91bcb7d44c2
110+
AWSMobileClient: fc20c58eaad166b3d2a569ef9ce691d6fed0c5dd
111111
AWSPluginsCore: ff09e8b86b7969327a1ba0b8c6946d83008cbf81
112112
CwlCatchException: 70a52ae44ea5d46db7bd385f801a94942420cd8c
113113
CwlPreconditionTesting: d33a4e4f285c0b885fddcae5dfedfbb34d4f3961

0 commit comments

Comments
 (0)