Skip to content

Commit a84c6df

Browse files
authored
fix(DataStore): ModelSyncedEventEmitter event order (#1494)
* fix(DataStore): ModelSyncedEventEmitter event order * fix(DataStore): funnel mutation events and modelSyncedEvent through datastore plugin * add more testing and name changes * address PR comments * address PR comments
1 parent ebb0aee commit a84c6df

26 files changed

+608
-216
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/AWSDataStorePlugin+DataStoreBaseBehavior.swift

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,15 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
180180
}
181181

182182
public func stop(completion: @escaping DataStoreCallback<Void>) {
183+
storageEngineInitSemaphore.wait()
183184
operationQueue.operations.forEach { operation in
184185
if let operation = operation as? DataStoreObserveQueryOperation {
185186
operation.resetState()
186187
}
187188
}
188-
storageEngineInitSemaphore.wait()
189+
dispatchedModelSyncedEvents.forEach { _, dispatchedModelSynced in
190+
dispatchedModelSynced.set(false)
191+
}
189192
if storageEngine == nil {
190193
storageEngineInitSemaphore.signal()
191194
completion(.successfulVoid)
@@ -199,12 +202,15 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
199202
}
200203

201204
public func clear(completion: @escaping DataStoreCallback<Void>) {
205+
storageEngineInitSemaphore.wait()
202206
operationQueue.operations.forEach { operation in
203207
if let operation = operation as? DataStoreObserveQueryOperation {
204208
operation.resetState()
205209
}
206210
}
207-
storageEngineInitSemaphore.wait()
211+
dispatchedModelSyncedEvents.forEach { _, dispatchedModelSynced in
212+
dispatchedModelSynced.set(false)
213+
}
208214
if storageEngine == nil {
209215
storageEngineInitSemaphore.signal()
210216
completion(.successfulVoid)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/AWSDataStorePlugin+DataStoreSubscribeBehavior.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,19 @@ extension AWSDataStorePlugin: DataStoreSubscribeBehavior {
5252
"`dataStorePublisher` is expected to exist for deployment targets >=iOS13.0",
5353
"", nil)).eraseToAnyPublisher()
5454
}
55+
guard let dispatchedModelSyncedEvent = dispatchedModelSyncedEvents[modelSchema.name] else {
56+
return Fail(error: DataStoreError.unknown(
57+
"`dispatchedModelSyncedEvent` is expected to exist for \(modelSchema.name)",
58+
"", nil)).eraseToAnyPublisher()
59+
}
5560
let operation = AWSDataStoreObserveQueryOperation(modelType: modelType,
5661
modelSchema: modelSchema,
5762
predicate: predicate,
5863
sortInput: sortInput,
5964
storageEngine: storageEngine,
6065
dataStorePublisher: dataStorePublisher,
61-
dataStoreConfiguration: dataStoreConfiguration)
66+
dataStoreConfiguration: dataStoreConfiguration,
67+
dispatchedModelSyncedEvent: dispatchedModelSyncedEvent)
6268
operationQueue.addOperation(operation)
6369
return operation.publisher
6470
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/AWSDataStorePlugin.swift

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
2323
/// The Publisher that sends mutation events to subscribers
2424
var dataStorePublisher: ModelSubcriptionBehavior?
2525

26+
var dispatchedModelSyncedEvents: [ModelName: AtomicValue<Bool>]
27+
2628
let modelRegistration: AmplifyModelRegistration
2729

2830
/// The DataStore configuration
@@ -70,6 +72,7 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
7072
} else {
7173
self.dataStorePublisher = nil
7274
}
75+
self.dispatchedModelSyncedEvents = [:]
7376
self.storageEngineInitSemaphore = DispatchSemaphore(value: 1)
7477
}
7578

@@ -88,6 +91,7 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
8891
self.storageEngineBehaviorFactory = storageEngineBehaviorFactory ??
8992
StorageEngine.init(isSyncEnabled:dataStoreConfiguration:validAPIPluginKey:validAuthPluginKey:modelRegistryVersion:userDefault:)
9093
self.dataStorePublisher = dataStorePublisher
94+
self.dispatchedModelSyncedEvents = [:]
9195
self.validAPIPluginKey = validAPIPluginKey
9296
self.validAuthPluginKey = validAuthPluginKey
9397
self.storageEngineInitSemaphore = DispatchSemaphore(value: 1)
@@ -98,6 +102,9 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
98102
/// them to `StorageEngine.setUp(modelSchemas:)`
99103
public func configure(using amplifyConfiguration: Any?) throws {
100104
modelRegistration.registerModels(registry: ModelRegistry.self)
105+
for modelSchema in ModelRegistry.modelSchemas {
106+
dispatchedModelSyncedEvents[modelSchema.name] = AtomicValue(initialValue: false)
107+
}
101108
resolveSyncEnabled()
102109
ModelListDecoderRegistry.registerDecoder(DataStoreListDecoder.self)
103110
}
@@ -165,7 +172,7 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
165172
.publisher
166173
.sink(
167174
receiveCompletion: { [weak self] in self?.onReceiveCompletion(completed: $0) },
168-
receiveValue: { [weak self] in self?.onRecieveValue(receiveValue: $0) }
175+
receiveValue: { [weak self] in self?.onReceiveValue(receiveValue: $0) }
169176
)
170177
}
171178

@@ -180,21 +187,36 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
180187
}
181188

182189
@available(iOS 13.0, *)
183-
private func onRecieveValue(receiveValue: StorageEngineEvent) {
190+
func onReceiveValue(receiveValue: StorageEngineEvent) {
184191
guard let dataStorePublisher = self.dataStorePublisher else {
185192
log.error("Data store publisher not initalized")
186193
return
187194
}
188195

189-
if case .mutationEvent(let mutationEvent) = receiveValue {
196+
switch receiveValue {
197+
case .started:
198+
break
199+
case .mutationEvent(let mutationEvent):
190200
dataStorePublisher.send(input: mutationEvent)
201+
case .modelSyncedEvent(let modelSyncedEvent):
202+
let modelSyncedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
203+
data: modelSyncedEvent)
204+
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
205+
dispatchedModelSyncedEvents[modelSyncedEvent.modelName]?.set(true)
206+
case .syncQueriesReadyEvent:
207+
let syncQueriesReadyEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.syncQueriesReady)
208+
Amplify.Hub.dispatch(to: .dataStore, payload: syncQueriesReadyEventPayload)
209+
case .readyEvent:
210+
let readyEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.ready)
211+
Amplify.Hub.dispatch(to: .dataStore, payload: readyEventPayload)
191212
}
192213
}
193214

194215
public func reset(onComplete: @escaping (() -> Void)) {
195216
if operationQueue != nil {
196217
operationQueue = nil
197218
}
219+
dispatchedModelSyncedEvents = [:]
198220
if let listener = hubListener {
199221
Amplify.Hub.removeListener(listener)
200222
hubListener = nil

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/SQLite/Statement+Model.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ extension Statement: StatementModelConvertible {
5858
var elements: [ModelValues] = []
5959

6060
// parse each row of the result
61-
let iter = self.makeIterator()
61+
let iter = makeIterator()
6262
while let row = try iter.failableNext() {
6363
let modelDictionary = try convert(row: row, withSchema: modelSchema, using: statement)
6464
elements.append(modelDictionary)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/StorageEngine.swift

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,38 @@ final class StorageEngine: StorageEngineBehavior {
132132
}
133133

134134
@available(iOS 13.0, *)
135-
private func onReceive(receiveValue: RemoteSyncEngineEvent) {
136-
if case .mutationEvent(let mutationEvent) = receiveValue {
137-
self.storageEnginePublisher.send(.mutationEvent(mutationEvent))
135+
func onReceive(receiveValue: RemoteSyncEngineEvent) {
136+
switch receiveValue {
137+
case .storageAdapterAvailable:
138+
break
139+
case .subscriptionsPaused:
140+
break
141+
case .mutationsPaused:
142+
break
143+
case .clearedStateOutgoingMutations:
144+
break
145+
case .subscriptionsInitialized:
146+
break
147+
case .performedInitialSync:
148+
break
149+
case .subscriptionsActivated:
150+
break
151+
case .mutationQueueStarted:
152+
break
153+
case .syncStarted:
154+
break
155+
case .cleanedUp:
156+
break
157+
case .cleanedUpForTermination:
158+
break
159+
case .mutationEvent(let mutationEvent):
160+
storageEnginePublisher.send(.mutationEvent(mutationEvent))
161+
case .modelSyncedEvent(let modelSyncedEvent):
162+
storageEnginePublisher.send(.modelSyncedEvent(modelSyncedEvent))
163+
case .syncQueriesReadyEvent:
164+
storageEnginePublisher.send(.syncQueriesReadyEvent)
165+
case .readyEvent:
166+
storageEnginePublisher.send(.readyEvent)
138167
}
139168
}
140169

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/StorageEngineBehavior.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import Combine
1212
enum StorageEngineEvent {
1313
case started
1414
case mutationEvent(MutationEvent)
15+
case modelSyncedEvent(ModelSyncedEvent)
16+
case syncQueriesReadyEvent
17+
case readyEvent
1518
}
1619

1720
protocol StorageEngineBehavior: AnyObject, ModelStorageBehavior {

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Subscribe/DataStoreObserveQueryOperation.swift

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
123123
let sortInput: [QuerySortDescriptor]?
124124
var storageEngine: StorageEngineBehavior
125125
var dataStorePublisher: ModelSubcriptionBehavior
126+
let dispatchedModelSyncedEvent: AtomicValue<Bool>
126127
let itemsChangedMaxSize: Int
127128

128129
let stopwatch: Stopwatch
@@ -140,18 +141,8 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
140141
return observeQueryPublisher.eraseToAnyPublisher()
141142
}
142143

143-
var isSynced: Bool {
144-
if let storageAdapter = storageEngine as? StorageEngine,
145-
let remoteSyncEngine = storageAdapter.syncEngine as? RemoteSyncEngine,
146-
let modelSyncedEventEmitter = remoteSyncEngine
147-
.syncEventEmitter?.modelSyncedEventEmitters[modelType.modelName] {
148-
return modelSyncedEventEmitter.dispatchedModelSyncedEvent
149-
}
150-
return false
151-
}
152-
153144
var currentSnapshot: DataStoreQuerySnapshot<M> {
154-
DataStoreQuerySnapshot<M>(items: currentItems.sortedModels, isSynced: isSynced)
145+
DataStoreQuerySnapshot<M>(items: currentItems.sortedModels, isSynced: dispatchedModelSyncedEvent.get())
155146
}
156147

157148
init(modelType: M.Type,
@@ -160,13 +151,15 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
160151
sortInput: [QuerySortDescriptor]?,
161152
storageEngine: StorageEngineBehavior,
162153
dataStorePublisher: ModelSubcriptionBehavior,
163-
dataStoreConfiguration: DataStoreConfiguration) {
154+
dataStoreConfiguration: DataStoreConfiguration,
155+
dispatchedModelSyncedEvent: AtomicValue<Bool>) {
164156
self.modelType = modelType
165157
self.modelSchema = modelSchema
166158
self.predicate = predicate
167159
self.sortInput = sortInput
168160
self.storageEngine = storageEngine
169161
self.dataStorePublisher = dataStorePublisher
162+
self.dispatchedModelSyncedEvent = dispatchedModelSyncedEvent
170163
self.itemsChangedMaxSize = Int(dataStoreConfiguration.syncPageSize)
171164
self.stopwatch = Stopwatch()
172165
self.observeQueryStarted = false
@@ -272,14 +265,14 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
272265

273266
func subscribeToItemChanges() {
274267
batchItemsChangedSink = dataStorePublisher.publisher
275-
.filter { _ in !self.isSynced }
268+
.filter { _ in !self.dispatchedModelSyncedEvent.get() }
276269
.filter(onItemChangedFilter(mutationEvent:))
277270
.collect(.byTimeOrCount(serialQueue, itemsChangedPeriodicPublishTimeInSeconds, itemsChangedMaxSize))
278271
.sink(receiveCompletion: onReceiveCompletion(completed:),
279272
receiveValue: onItemsChange(mutationEvents:))
280273

281274
itemsChangedSink = dataStorePublisher.publisher
282-
.filter { _ in self.isSynced }
275+
.filter { _ in self.dispatchedModelSyncedEvent.get() }
283276
.filter(onItemChangedFilter(mutationEvent:))
284277
.receive(on: serialQueue)
285278
.sink(receiveCompletion: onReceiveCompletion(completed:),

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/ModelSyncedEventEmitter.swift

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import Foundation
1313
enum IncomingModelSyncedEmitterEvent {
1414
case mutationEventApplied(MutationEvent)
1515
case mutationEventDropped(modelName: String)
16-
case modelSyncedEvent
16+
case modelSyncedEvent(ModelSyncedEvent)
1717
}
1818

1919
/// Listens to events published by both the `InitialSyncOrchestrator` and `IncomingEventReconciliationQueue`,
@@ -45,31 +45,12 @@ final class ModelSyncedEventEmitter {
4545
return modelSyncedEventTopic.eraseToAnyPublisher()
4646
}
4747

48-
var shouldDispatchModelSyncedEvent: Bool {
48+
var shouldSendModelSyncedEvent: Bool {
4949
initialSyncOperationFinished && reconciledReceived == recordsReceived
5050
}
5151

52-
/// Used internally within ModelSyncedEventEmitter instances, not thread-safe, is accessed serially under the
53-
/// DispatchQueue. Exists to avoid using `dispatchedModelSyncedEvent` which requires acquiring a lock each time.
54-
private var _dispatchedModelSyncedEvent: Bool
55-
56-
/// Used by other internal classes for checking state in a thread-safe way.
57-
var dispatchedModelSyncedEvent: Bool {
58-
get {
59-
dispatchedModelSyncedEventLock.lock()
60-
defer {
61-
dispatchedModelSyncedEventLock.unlock()
62-
}
63-
return _dispatchedModelSyncedEvent
64-
}
65-
set {
66-
dispatchedModelSyncedEventLock.lock()
67-
defer {
68-
dispatchedModelSyncedEventLock.unlock()
69-
}
70-
_dispatchedModelSyncedEvent = newValue
71-
}
72-
}
52+
/// Used within ModelSyncedEventEmitter instances, not thread-safe, is accessed serially under DispatchQueue.
53+
var dispatchedModelSyncedEvent: Bool
7354

7455
init(modelSchema: ModelSchema,
7556
initialSyncOrchestrator: InitialSyncOrchestrator?,
@@ -78,7 +59,7 @@ final class ModelSyncedEventEmitter {
7859
self.recordsReceived = 0
7960
self.reconciledReceived = 0
8061
self.initialSyncOperationFinished = false
81-
self._dispatchedModelSyncedEvent = false
62+
self.dispatchedModelSyncedEvent = false
8263
self.modelSyncedEventBuilder = ModelSyncedEvent.Builder()
8364

8465
self.modelSyncedEventTopic = PassthroughSubject<IncomingModelSyncedEmitterEvent, Never>()
@@ -137,13 +118,13 @@ final class ModelSyncedEventEmitter {
137118
case .finished:
138119
initialSyncOperationFinished = true
139120
if recordsReceived == 0 {
140-
dispatchModelSyncedEvent()
121+
sendModelSyncedEvent()
141122
}
142123
}
143124
}
144125

145126
private func onReceiveReconciliationEvent(value: IncomingEventReconciliationQueueEvent) {
146-
guard !_dispatchedModelSyncedEvent else {
127+
guard !dispatchedModelSyncedEvent else {
147128
switch value {
148129
case .mutationEventApplied(let event):
149130
modelSyncedEventTopic.send(.mutationEventApplied(event))
@@ -169,31 +150,29 @@ final class ModelSyncedEventEmitter {
169150
log.error("Unexpected mutationType received: \(event.mutationType)")
170151
}
171152

172-
if shouldDispatchModelSyncedEvent {
173-
dispatchModelSyncedEvent()
174-
}
175-
176153
modelSyncedEventTopic.send(.mutationEventApplied(event))
177-
case .mutationEventDropped(let modelName):
178-
reconciledReceived += 1
179154

180-
if shouldDispatchModelSyncedEvent {
181-
dispatchModelSyncedEvent()
155+
if shouldSendModelSyncedEvent {
156+
sendModelSyncedEvent()
182157
}
158+
case .mutationEventDropped(let modelName):
159+
reconciledReceived += 1
183160

184161
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName))
162+
163+
if shouldSendModelSyncedEvent {
164+
sendModelSyncedEvent()
165+
}
185166
case .initialized, .started, .paused:
186167
return
187168
}
188169
}
189170

190-
private func dispatchModelSyncedEvent() {
171+
private func sendModelSyncedEvent() {
191172
modelSyncedEventBuilder.modelName = modelSchema.name
192-
let modelSyncedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
193-
data: modelSyncedEventBuilder.build())
194-
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
173+
let modelSyncedEvent = modelSyncedEventBuilder.build()
174+
modelSyncedEventTopic.send(.modelSyncedEvent(modelSyncedEvent))
195175
dispatchedModelSyncedEvent = true
196-
modelSyncedEventTopic.send(.modelSyncedEvent)
197176
syncOrchestratorSink?.cancel()
198177
}
199178
}

0 commit comments

Comments
 (0)