Skip to content

Commit f8e0429

Browse files
authored
fix(DataStore): model reconciliation queue publisher (#756)
1 parent 723f8a2 commit f8e0429

File tree

3 files changed

+104
-10
lines changed

3 files changed

+104
-10
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
6464
private let modelName: String
6565

6666
private var incomingEventsSink: AnyCancellable?
67-
private var reconcileAndLocalSaveOperationSink: AnyCancellable?
67+
private var reconcileAndLocalSaveOperationSinks: Set<AnyCancellable?>
6868

6969
private let modelReconciliationQueueSubject: PassthroughSubject<ModelReconciliationQueueEvent, DataStoreError>
7070
var publisher: AnyPublisher<ModelReconciliationQueueEvent, DataStoreError> {
@@ -98,7 +98,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
9898
let resolvedIncomingSubscriptionEvents = incomingSubscriptionEvents ??
9999
AWSIncomingSubscriptionEventPublisher(modelType: modelType, api: api, auth: auth)
100100
self.incomingSubscriptionEvents = resolvedIncomingSubscriptionEvents
101-
101+
self.reconcileAndLocalSaveOperationSinks = Set<AnyCancellable?>()
102102
self.incomingEventsSink = resolvedIncomingSubscriptionEvents
103103
.publisher
104104
.sink(receiveCompletion: { [weak self] completion in
@@ -133,11 +133,17 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
133133
func enqueue(_ remoteModel: MutationSync<AnyModel>) {
134134
let reconcileOp = ReconcileAndLocalSaveOperation(remoteModel: remoteModel,
135135
storageAdapter: storageAdapter)
136-
reconcileAndLocalSaveOperationSink = reconcileOp.publisher.sink(receiveCompletion: { error in
137-
self.modelReconciliationQueueSubject.send(completion: error)
136+
var reconcileAndLocalSaveOperationSink: AnyCancellable?
137+
138+
reconcileAndLocalSaveOperationSink = reconcileOp.publisher.sink(receiveCompletion: { completion in
139+
self.reconcileAndLocalSaveOperationSinks.remove(reconcileAndLocalSaveOperationSink)
140+
if case .failure = completion {
141+
self.modelReconciliationQueueSubject.send(completion: completion)
142+
}
138143
}, receiveValue: { mutationEvent in
139144
self.modelReconciliationQueueSubject.send(.mutationEvent(mutationEvent))
140145
})
146+
reconcileAndLocalSaveOperationSinks.insert(reconcileAndLocalSaveOperationSink)
141147
reconcileAndSaveQueue.addOperation(reconcileOp)
142148
}
143149

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,12 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
9696
case .inError(let error):
9797
// Maybe we have to notify the Hub?
9898
log.error(error: error)
99+
notifyFinished()
99100
finish()
100101

101102
case .finished:
102103
// Maybe we have to notify the Hub?
104+
notifyFinished()
103105
finish()
104106

105107
}
@@ -301,6 +303,10 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
301303
stateMachine.notify(action: .notified)
302304
}
303305

306+
private func notifyFinished() {
307+
mutationEventPublisher.send(completion: .finished)
308+
}
309+
304310
private func getPendingMutations(forModelId modelId: Model.Identifier) -> DataStoreResult<[MutationEvent]> {
305311
guard let storageAdapter = storageAdapter else {
306312
return .failure(DataStoreError.nilStorageAdapter())

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/ModelReconciliationQueueBehaviorTests.swift

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,34 @@ class ModelReconciliationQueueBehaviorTests: ReconciliationQueueTestBase {
8989
subscriptionEventsSubject.send(.mutationEvent(mutationSync))
9090
}
9191

92+
let eventsSentViaPublisher1 = expectation(description: "id-1 sent via publisher")
93+
let eventsSentViaPublisher2 = expectation(description: "id-2 sent via publisher")
94+
let eventsSentViaPublisher3 = expectation(description: "id-3 sent via publisher")
95+
let queueSink = queue.publisher.sink(receiveCompletion: { _ in
96+
XCTFail("Not expecting a call to completion")
97+
}, receiveValue: { event in
98+
if case let .mutationEvent(mutationEvent) = event {
99+
switch mutationEvent.modelId {
100+
case "id-1":
101+
eventsSentViaPublisher1.fulfill()
102+
case "id-2":
103+
eventsSentViaPublisher2.fulfill()
104+
case "id-3":
105+
eventsSentViaPublisher3.fulfill()
106+
default:
107+
XCTFail("Not expecting a call to default")
108+
}
109+
}
110+
})
111+
92112
queue.start()
93113

94-
wait(for: [event1Saved, event2Saved, event3Saved], timeout: 5.0, enforceOrder: true)
114+
wait(for: [event1Saved,
115+
event2Saved,
116+
event3Saved], timeout: 5.0, enforceOrder: true)
117+
wait(for: [eventsSentViaPublisher1,
118+
eventsSentViaPublisher2,
119+
eventsSentViaPublisher3], timeout: 2.0)
95120
}
96121

97122
/// - Given: An AWSModelReconciliationQueue that has been buffering events
@@ -156,9 +181,33 @@ class ModelReconciliationQueueBehaviorTests: ReconciliationQueueTestBase {
156181
subscriptionEventsSubject.send(.mutationEvent(mutationSync))
157182
}
158183

184+
let eventsSentViaPublisher1 = expectation(description: "id-1 sent via publisher")
185+
let eventsSentViaPublisher2 = expectation(description: "id-2 sent via publisher")
186+
let eventsSentViaPublisher3 = expectation(description: "id-3 sent via publisher")
187+
188+
let queueSink = queue.publisher.sink(receiveCompletion: { _ in
189+
XCTFail("Not expecting a call to completion")
190+
}, receiveValue: { event in
191+
if case let .mutationEvent(mutationEvent) = event {
192+
switch mutationEvent.modelId {
193+
case "id-1":
194+
eventsSentViaPublisher1.fulfill()
195+
case "id-2":
196+
eventsSentViaPublisher2.fulfill()
197+
case "id-3":
198+
eventsSentViaPublisher3.fulfill()
199+
default:
200+
break
201+
}
202+
}
203+
})
204+
159205
queue.start()
160206

161-
wait(for: [allEventsProcessed], timeout: 5.0)
207+
wait(for: [allEventsProcessed,
208+
eventsSentViaPublisher1,
209+
eventsSentViaPublisher2,
210+
eventsSentViaPublisher3], timeout: 5.0)
162211
}
163212

164213
/// - Given: A started AWSModelReconciliationQueue with no pending events
@@ -202,15 +251,35 @@ class ModelReconciliationQueueBehaviorTests: ReconciliationQueueTestBase {
202251
subscriptionEventsSubject.send(.mutationEvent(mutationSync))
203252
}
204253

254+
let eventsSentViaPublisher1 = expectation(description: "id-1 sent via publisher")
255+
let eventsSentViaPublisher2 = expectation(description: "id-2 sent via publisher")
256+
var queueSink = queue.publisher.sink(receiveCompletion: { _ in
257+
XCTFail("Not expecting a call to completion")
258+
}, receiveValue: { event in
259+
if case let .mutationEvent(mutationEvent) = event {
260+
switch mutationEvent.modelId {
261+
case "id-1":
262+
eventsSentViaPublisher1.fulfill()
263+
case "id-2":
264+
eventsSentViaPublisher2.fulfill()
265+
default:
266+
XCTFail("Not expecting a call to default")
267+
}
268+
}
269+
})
270+
205271
queue.start()
206272

207-
wait(for: [event1ShouldBeProcessed, event2ShouldBeProcessed], timeout: 1.0)
273+
wait(for: [event1ShouldBeProcessed,
274+
event2ShouldBeProcessed,
275+
eventsSentViaPublisher1,
276+
eventsSentViaPublisher2], timeout: 1.0)
208277

209278
let event1ShouldNotBeProcessed = expectation(description: "Event 1 should not be processed")
210279
event1ShouldNotBeProcessed.isInverted = true
211280
let event2ShouldNotBeProcessed = expectation(description: "Event 2 should not be processed")
212281
event2ShouldNotBeProcessed.isInverted = true
213-
let event3ShouldBeProcessed = expectation(description: "Event 3 should not be processed")
282+
let event3ShouldBeProcessed = expectation(description: "Event 3 should be processed")
214283
storageAdapter.responders[.saveModelCompletion] =
215284
SaveModelCompletionResponder<MutationSyncMetadata> { model, completion in
216285
switch model.id {
@@ -226,6 +295,17 @@ class ModelReconciliationQueueBehaviorTests: ReconciliationQueueTestBase {
226295
completion(.success(model))
227296
}
228297

298+
let eventsSentViaPublisher3 = expectation(description: "id-3 sent via publisher")
299+
queueSink = queue.publisher.sink(receiveCompletion: { _ in
300+
XCTFail("Not expecting a call to completion")
301+
}, receiveValue: { event in
302+
if case let .mutationEvent(mutationEvent) = event {
303+
if mutationEvent.modelId == "id-3" {
304+
eventsSentViaPublisher3.fulfill()
305+
}
306+
}
307+
})
308+
229309
let model = try MockSynced(id: "id-3").eraseToAnyModel()
230310
let syncMetadata = MutationSyncMetadata(id: model.id,
231311
deleted: false,
@@ -234,8 +314,10 @@ class ModelReconciliationQueueBehaviorTests: ReconciliationQueueTestBase {
234314
let mutationSync = MutationSync(model: model, syncMetadata: syncMetadata)
235315
subscriptionEventsSubject.send(.mutationEvent(mutationSync))
236316

237-
wait(for: [event1ShouldNotBeProcessed, event2ShouldNotBeProcessed, event3ShouldBeProcessed], timeout: 1.0)
238-
317+
wait(for: [event1ShouldNotBeProcessed,
318+
event2ShouldNotBeProcessed,
319+
event3ShouldBeProcessed,
320+
eventsSentViaPublisher3], timeout: 1.0)
239321
}
240322

241323
}

0 commit comments

Comments
 (0)