Skip to content

Commit cc77e86

Browse files
authored
fix(DataStore): ModelSyncedEvent before last MutationEvent (#1472)
* fix(DataStore): ModelSyncedEvent before last MutationEvent * address PR comments
1 parent 53b828a commit cc77e86

File tree

8 files changed

+251
-39
lines changed

8 files changed

+251
-39
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/ModelSyncedEventEmitter.swift

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,21 @@ import AWSPluginsCore
1010
import Combine
1111
import Foundation
1212

13-
@available(iOS 13.0, *)
13+
enum IncomingModelSyncedEmitterEvent {
14+
case mutationEventApplied(MutationEvent)
15+
case mutationEventDropped(modelName: String)
16+
case modelSyncedEvent
17+
}
18+
1419
/// Listens to events published by both the `InitialSyncOrchestrator` and `IncomingEventReconciliationQueue`,
1520
/// and emits a `ModelSyncedEvent` when the initial sync is complete. This class expects
1621
/// `InitialSyncOrchestrator` and `IncomingEventReconciliationQueue` to have matching counts
17-
/// for the events they enqueue and process, respectively.
22+
/// for the events they enqueue and process, respectively. Always send back the reconciled event
23+
/// (`.mutationEventApplied`, `.mutationEventDropped`). The flow also provides a guaranteed sequence of events for the
24+
/// mutation event which causes the `ModelSyncedEvent` to be emitted afterwards by
25+
/// - Check if it `ModelSyncedEvent` should be emitted, if so, emit it.
26+
/// - Then send the mutation event which was used in the check above.
27+
@available(iOS 13.0, *)
1828
final class ModelSyncedEventEmitter {
1929
private let queue = DispatchQueue(label: "com.amazonaws.ModelSyncedEventEmitterQueue",
2030
target: DispatchQueue.global())
@@ -29,11 +39,15 @@ final class ModelSyncedEventEmitter {
2939

3040
private var modelSyncedEventBuilder: ModelSyncedEvent.Builder
3141

32-
private var modelSyncedEventTopic: PassthroughSubject<Never, Never>
33-
var publisher: AnyPublisher<Never, Never> {
42+
private var modelSyncedEventTopic: PassthroughSubject<IncomingModelSyncedEmitterEvent, Never>
43+
var publisher: AnyPublisher<IncomingModelSyncedEmitterEvent, Never> {
3444
return modelSyncedEventTopic.eraseToAnyPublisher()
3545
}
3646

47+
var shouldDispatchModelSyncedEvent: Bool {
48+
initialSyncOperationFinished && reconciledReceived == recordsReceived
49+
}
50+
3751
var dispatchedModelSyncedEvent: AtomicValue<Bool>
3852

3953
init(modelSchema: ModelSchema,
@@ -46,7 +60,7 @@ final class ModelSyncedEventEmitter {
4660
self.dispatchedModelSyncedEvent = AtomicValue(initialValue: false)
4761
self.modelSyncedEventBuilder = ModelSyncedEvent.Builder()
4862

49-
self.modelSyncedEventTopic = PassthroughSubject<Never, Never>()
63+
self.modelSyncedEventTopic = PassthroughSubject<IncomingModelSyncedEmitterEvent, Never>()
5064

5165
self.syncOrchestratorSink = initialSyncOrchestrator?
5266
.publisher
@@ -87,8 +101,8 @@ final class ModelSyncedEventEmitter {
87101
return modelSchema.name == event.modelName
88102
case .mutationEventDropped(let modelName):
89103
return modelSchema.name == modelName
90-
default:
91-
return true
104+
case .initialized, .started, .paused:
105+
return false
92106
}
93107
}
94108

@@ -108,6 +122,18 @@ final class ModelSyncedEventEmitter {
108122
}
109123

110124
private func onReceiveReconciliationEvent(value: IncomingEventReconciliationQueueEvent) {
125+
guard !dispatchedModelSyncedEvent.get() else {
126+
switch value {
127+
case .mutationEventApplied(let event):
128+
modelSyncedEventTopic.send(.mutationEventApplied(event))
129+
case .mutationEventDropped(let modelName):
130+
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName))
131+
case .initialized, .started, .paused:
132+
return
133+
}
134+
return
135+
}
136+
111137
switch value {
112138
case .mutationEventApplied(let event):
113139
reconciledReceived += 1
@@ -121,14 +147,22 @@ final class ModelSyncedEventEmitter {
121147
default:
122148
log.error("Unexpected mutationType received: \(event.mutationType)")
123149
}
124-
case .mutationEventDropped:
150+
151+
if shouldDispatchModelSyncedEvent {
152+
dispatchModelSyncedEvent()
153+
}
154+
155+
modelSyncedEventTopic.send(.mutationEventApplied(event))
156+
case .mutationEventDropped(let modelName):
125157
reconciledReceived += 1
126-
default:
127-
return
128-
}
129158

130-
if initialSyncOperationFinished && reconciledReceived == recordsReceived {
131-
dispatchModelSyncedEvent()
159+
if shouldDispatchModelSyncedEvent {
160+
dispatchModelSyncedEvent()
161+
}
162+
163+
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName))
164+
case .initialized, .started, .paused:
165+
return
132166
}
133167
}
134168

@@ -138,13 +172,8 @@ final class ModelSyncedEventEmitter {
138172
data: modelSyncedEventBuilder.build())
139173
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
140174
dispatchedModelSyncedEvent.set(true)
141-
modelSyncedEventTopic.send(completion: .finished)
142-
cancel()
143-
}
144-
145-
func cancel() {
175+
modelSyncedEventTopic.send(.modelSyncedEvent)
146176
syncOrchestratorSink?.cancel()
147-
reconciliationQueueSink?.cancel()
148177
}
149178
}
150179

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/SyncEventEmitter.swift

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,44 @@ import Amplify
99
import AWSPluginsCore
1010
import Combine
1111

12+
enum IncomingSyncEventEmitterEvent {
13+
case mutationEventApplied(MutationEvent)
14+
case mutationEventDropped(modelName: String)
15+
}
16+
17+
/// SyncEventEmitter holds onto one ModelSyncedEventEmitter per model. It counts the number of `modelSyncedEvent` to
18+
/// emit the `syncQueriesReady` and sends back the reconciliation events (`.mutationEventApplied`,
19+
/// `.mutationEventDropped`) to its subscribers.
1220
@available(iOS 13.0, *)
1321
final class SyncEventEmitter {
22+
private let queue = DispatchQueue(label: "com.amazonaws.SyncEventEmitter",
23+
target: DispatchQueue.global())
24+
1425
var modelSyncedEventEmitters: [String: ModelSyncedEventEmitter]
1526
var initialSyncCompleted: AnyCancellable?
1627

28+
private var syncableModels: Int
29+
private var modelSyncedReceived: Int
30+
31+
private var syncEventEmitterTopic: PassthroughSubject<IncomingSyncEventEmitterEvent, Never>
32+
var publisher: AnyPublisher<IncomingSyncEventEmitterEvent, Never> {
33+
return syncEventEmitterTopic.eraseToAnyPublisher()
34+
}
35+
36+
var shouldDispatchSyncQueriesReadyEvent: Bool {
37+
syncableModels == modelSyncedReceived
38+
}
39+
1740
init(initialSyncOrchestrator: InitialSyncOrchestrator?,
1841
reconciliationQueue: IncomingEventReconciliationQueue?) {
1942
self.modelSyncedEventEmitters = [String: ModelSyncedEventEmitter]()
43+
self.syncEventEmitterTopic = PassthroughSubject<IncomingSyncEventEmitterEvent, Never>()
44+
self.modelSyncedReceived = 0
2045

2146
let syncableModelSchemas = ModelRegistry.modelSchemas.filter { $0.isSyncable }
47+
self.syncableModels = syncableModelSchemas.count
2248

23-
var publishers = [AnyPublisher<Never, Never>]()
49+
var publishers = [AnyPublisher<IncomingModelSyncedEmitterEvent, Never>]()
2450
for syncableModelSchema in syncableModelSchemas {
2551
let modelSyncedEventEmitter = ModelSyncedEventEmitter(modelSchema: syncableModelSchema,
2652
initialSyncOrchestrator: initialSyncOrchestrator,
@@ -31,16 +57,31 @@ final class SyncEventEmitter {
3157

3258
self.initialSyncCompleted = Publishers
3359
.MergeMany(publishers)
34-
.sink(receiveCompletion: { [weak self] _ in
35-
self?.dispatchSyncQueriesReady()
36-
}, receiveValue: { _ in })
60+
.receive(on: queue)
61+
.sink(receiveCompletion: { _ in },
62+
receiveValue: { [weak self] value in
63+
self?.onReceiveModelSyncedEmitterEvent(value: value)
64+
})
65+
}
66+
67+
private func onReceiveModelSyncedEmitterEvent(value: IncomingModelSyncedEmitterEvent) {
68+
switch value {
69+
case .mutationEventApplied(let mutationEvent):
70+
syncEventEmitterTopic.send(.mutationEventApplied(mutationEvent))
71+
case .mutationEventDropped(let modelName):
72+
syncEventEmitterTopic.send(.mutationEventDropped(modelName: modelName))
73+
case .modelSyncedEvent:
74+
modelSyncedReceived += 1
75+
if shouldDispatchSyncQueriesReadyEvent {
76+
dispatchSyncQueriesReady()
77+
}
78+
}
3779
}
3880

3981
private func dispatchSyncQueriesReady() {
4082
let syncQueriesReadyEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.syncQueriesReady)
4183
Amplify.Hub.dispatch(to: .dataStore, payload: syncQueriesReadyEventPayload)
4284
}
43-
4485
}
4586

4687
@available(iOS 13.0, *)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ extension RemoteSyncEngine {
4646
}
4747
case .paused:
4848
remoteSyncTopicPublisher.send(.subscriptionsPaused)
49+
case .mutationEventDropped, .mutationEventApplied:
50+
break
51+
}
52+
}
53+
54+
func onReceive(receiveValue: IncomingSyncEventEmitterEvent) {
55+
switch receiveValue {
4956
case .mutationEventApplied(let mutationEvent):
5057
remoteSyncTopicPublisher.send(.mutationEvent(mutationEvent))
5158
case .mutationEventDropped:

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Retryable.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ extension RemoteSyncEngine {
2222
scheduleRestart(advice: advice)
2323
} else {
2424
remoteSyncTopicPublisher.send(completion: .failure(DataStoreError.api(error)))
25-
cancelEmitters()
2625
if let completionBlock = finishedCompletionBlock {
2726
completionBlock(.failure(causedBy: error))
2827
finishedCompletionBlock = nil

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
2929
private let initialSyncOrchestratorFactory: InitialSyncOrchestratorFactory
3030

3131
var syncEventEmitter: SyncEventEmitter?
32-
private var readyEventEmitter: ReadyEventEmitter?
32+
var readyEventEmitter: ReadyEventEmitter?
3333

3434
private let mutationEventIngester: MutationEventIngester
3535
let mutationEventPublisher: MutationEventPublisher
3636
private let outgoingMutationQueue: OutgoingMutationQueueBehavior
3737
private var outgoingMutationQueueSink: AnyCancellable?
3838

3939
private var reconciliationQueueSink: AnyCancellable?
40+
private var syncEventEmitterSink: AnyCancellable?
4041

4142
let remoteSyncTopicPublisher: PassthroughSubject<RemoteSyncEngineEvent, DataStoreError>
4243
var publisher: AnyPublisher<RemoteSyncEngineEvent, DataStoreError> {
@@ -230,7 +231,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
230231

231232
func terminate() {
232233
remoteSyncTopicPublisher.send(completion: .finished)
233-
cancelEmitters()
234+
cleanup()
234235
if let completionBlock = finishedCompletionBlock {
235236
completionBlock(.successfulVoid)
236237
finishedCompletionBlock = nil
@@ -305,6 +306,13 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
305306
syncEventEmitter = SyncEventEmitter(initialSyncOrchestrator: initialSyncOrchestrator,
306307
reconciliationQueue: reconciliationQueue)
307308

309+
syncEventEmitterSink = syncEventEmitter?
310+
.publisher
311+
.sink(
312+
receiveCompletion: { _ in },
313+
receiveValue: { [weak self] in self?.onReceive(receiveValue: $0) }
314+
)
315+
308316
readyEventEmitter = ReadyEventEmitter(
309317
remoteSyncEnginePublisher: publisher,
310318
completion: { }
@@ -358,17 +366,15 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
358366
}
359367

360368
private func cleanup(error: AmplifyError) {
361-
reconciliationQueue?.cancel()
362-
reconciliationQueue = nil
369+
cleanup()
363370
outgoingMutationQueue.stopSyncingToCloud {
364371
self.remoteSyncTopicPublisher.send(.cleanedUp)
365372
self.stateMachine.notify(action: .cleanedUp(error))
366373
}
367374
}
368375

369376
private func cleanupForTermination() {
370-
reconciliationQueue?.cancel()
371-
reconciliationQueue = nil
377+
cleanup()
372378
outgoingMutationQueue.stopSyncingToCloud {
373379
self.mutationEventPublisher.cancel()
374380
self.remoteSyncTopicPublisher.send(.cleanedUpForTermination)
@@ -394,8 +400,12 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
394400
}
395401

396402
/// Must be invoked from workQueue (as during a `respond` call)
397-
func cancelEmitters() {
403+
func cleanup() {
404+
reconciliationQueue?.cancel()
405+
reconciliationQueue = nil
406+
reconciliationQueueSink = nil
398407
syncEventEmitter = nil
408+
syncEventEmitterSink = nil
399409
readyEventEmitter = nil
400410
}
401411
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreHubEventsTests.swift

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,16 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
154154
}
155155

156156
if modelSyncedEvents.count == 2 {
157-
XCTAssertEqual(modelSyncedEvents[0].modelName, expectedSyncedModelNames[0])
158-
XCTAssertTrue(modelSyncedEvents[0].isFullSync)
159-
XCTAssertFalse(modelSyncedEvents[0].isDeltaSync)
160-
XCTAssertEqual(modelSyncedEvents[1].modelName, expectedSyncedModelNames[1])
161-
XCTAssertTrue(modelSyncedEvents[1].isFullSync)
162-
XCTAssertFalse(modelSyncedEvents[1].isDeltaSync)
157+
guard let postModelSyncedEvent = modelSyncedEvents.first(where: { $0.modelName == "Post" }),
158+
let commentModelSyncedEvent = modelSyncedEvents.first(where: { $0.modelName == "Comment" }) else {
159+
XCTFail("Could not get modelSyncedEvent for Post and Comment")
160+
return
161+
}
162+
163+
XCTAssertTrue(postModelSyncedEvent.isFullSync)
164+
XCTAssertFalse(postModelSyncedEvent.isDeltaSync)
165+
XCTAssertTrue(commentModelSyncedEvent.isFullSync)
166+
XCTAssertFalse(commentModelSyncedEvent.isDeltaSync)
163167
modelSyncedReceived.fulfill()
164168
}
165169
}

0 commit comments

Comments
 (0)