Skip to content

Commit 23eb7bc

Browse files
authored
fix(DataStore): do not send completion event to publisher on DataStore.clear() and DataStore.stop() (#1273)
* fix(Datastore): observe() stops working after clear() * Add test to check mutation events are received after clear() is called * Address review comments * Remove subscriber completion from DataStore.stop() * Address review comments
1 parent 67a1c61 commit 23eb7bc

File tree

3 files changed

+231
-41
lines changed

3 files changed

+231
-41
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/AWSDataStorePlugin+DataStoreBaseBehavior.swift

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,6 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
187187
storageEngineInitSemaphore.signal()
188188
storageEngine.stopSync { result in
189189
self.storageEngine = nil
190-
if #available(iOS 13.0, *) {
191-
self.dataStorePublisher?.sendFinished()
192-
}
193-
self.dataStorePublisher = nil
194190
completion(result)
195191
}
196192
}
@@ -205,10 +201,6 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
205201
storageEngineInitSemaphore.signal()
206202
storageEngine.clear { result in
207203
self.storageEngine = nil
208-
if #available(iOS 13.0, *) {
209-
self.dataStorePublisher?.sendFinished()
210-
}
211-
self.dataStorePublisher = nil
212204
completion(result)
213205
}
214206
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/AWSDataStorePlugin.swift

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,11 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
153153

154154
@available(iOS 13.0, *)
155155
private func onReceiveCompletion(completed: Subscribers.Completion<DataStoreError>) {
156-
guard let dataStorePublisher = self.dataStorePublisher else {
157-
log.error("Data store publisher not initalized")
158-
return
159-
}
160156
switch completed {
161157
case .failure(let dataStoreError):
162-
dataStorePublisher.send(dataStoreError: dataStoreError)
158+
log.error("StorageEngine completed with error: \(dataStoreError)")
163159
case .finished:
164-
dataStorePublisher.sendFinished()
160+
break
165161
}
166162
}
167163

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Core/AWSAPICategoryPluginTests.swift

Lines changed: 229 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//
77

88
import XCTest
9+
import AmplifyTestCommon
910

1011
@testable import Amplify
1112
@testable import AWSDataStoreCategoryPlugin
@@ -106,21 +107,35 @@ class AWSAPICategoryPluginTests: XCTestCase {
106107
dataStorePublisher: dataStorePublisher,
107108
validAPIPluginKey: "MockAPICategoryPlugin",
108109
validAuthPluginKey: "MockAuthCategoryPlugin")
110+
111+
let finishNotReceived = expectation(
112+
description: "publisher should not receive .finished completion event after stop() is called")
113+
finishNotReceived.isInverted = true
114+
109115
do {
110116
try plugin.configure(using: nil)
111-
XCTAssert(plugin.storageEngine == nil)
117+
XCTAssertNil(plugin.storageEngine)
118+
119+
let sink = plugin.publisher.sink { completion in
120+
switch completion {
121+
case .finished:
122+
finishNotReceived.fulfill()
123+
case .failure(let error):
124+
XCTFail("Error \(error)")
125+
}
126+
} receiveValue: { _ in }
112127

113128
let semaphore = DispatchSemaphore(value: 0)
114129
plugin.start(completion: {_ in
115-
XCTAssert(plugin.storageEngine != nil)
116-
XCTAssert(plugin.dataStorePublisher != nil)
130+
XCTAssertNotNil(plugin.storageEngine)
131+
XCTAssertNotNil(plugin.dataStorePublisher)
117132
semaphore.signal()
118133
})
119134
semaphore.wait()
120135

121136
plugin.stop(completion: { _ in
122-
XCTAssert(plugin.storageEngine == nil)
123-
XCTAssert(plugin.dataStorePublisher == nil)
137+
XCTAssertNil(plugin.storageEngine)
138+
XCTAssertNotNil(plugin.dataStorePublisher)
124139
semaphore.signal()
125140
})
126141
semaphore.wait()
@@ -130,14 +145,14 @@ class AWSAPICategoryPluginTests: XCTestCase {
130145
}
131146

132147
plugin.start(completion: { _ in
133-
XCTAssert(plugin.storageEngine != nil)
134-
XCTAssert(plugin.dataStorePublisher != nil)
148+
XCTAssertNotNil(plugin.storageEngine)
149+
XCTAssertNotNil(plugin.dataStorePublisher)
135150
})
136-
151+
waitForExpectations(timeout: 1.0)
152+
sink.cancel()
137153
} catch {
138154
XCTFail("DataStore configuration should not fail with nil configuration. \(error)")
139155
}
140-
waitForExpectations(timeout: 1.0)
141156
}
142157

143158
func testStorageEngineStartClearStart() throws {
@@ -161,21 +176,35 @@ class AWSAPICategoryPluginTests: XCTestCase {
161176
dataStorePublisher: dataStorePublisher,
162177
validAPIPluginKey: "MockAPICategoryPlugin",
163178
validAuthPluginKey: "MockAuthCategoryPlugin")
179+
180+
let finishNotReceived = expectation(
181+
description: "publisher should not receive .finished completion event after clear() is called")
182+
finishNotReceived.isInverted = true
183+
164184
do {
165185
try plugin.configure(using: nil)
166-
XCTAssert(plugin.storageEngine == nil)
186+
XCTAssertNil(plugin.storageEngine)
187+
188+
let sink = plugin.publisher.sink { completion in
189+
switch completion {
190+
case .finished:
191+
finishNotReceived.fulfill()
192+
case .failure(let error):
193+
XCTFail("Error \(error)")
194+
}
195+
} receiveValue: { _ in }
167196

168197
let semaphore = DispatchSemaphore(value: 0)
169198
plugin.start(completion: {_ in
170-
XCTAssert(plugin.storageEngine != nil)
171-
XCTAssert(plugin.dataStorePublisher != nil)
199+
XCTAssertNotNil(plugin.storageEngine)
200+
XCTAssertNotNil(plugin.dataStorePublisher)
172201
semaphore.signal()
173202
})
174203
semaphore.wait()
175204

176205
plugin.clear(completion: { _ in
177-
XCTAssert(plugin.storageEngine == nil)
178-
XCTAssert(plugin.dataStorePublisher == nil)
206+
XCTAssertNil(plugin.storageEngine)
207+
XCTAssertNotNil(plugin.dataStorePublisher)
179208
semaphore.signal()
180209
})
181210
semaphore.wait()
@@ -184,14 +213,14 @@ class AWSAPICategoryPluginTests: XCTestCase {
184213
}
185214

186215
plugin.start(completion: { _ in
187-
XCTAssert(plugin.storageEngine != nil)
188-
XCTAssert(plugin.dataStorePublisher != nil)
216+
XCTAssertNotNil(plugin.storageEngine)
217+
XCTAssertNotNil(plugin.dataStorePublisher)
189218
})
190-
219+
waitForExpectations(timeout: 1.0)
220+
sink.cancel()
191221
} catch {
192222
XCTFail("DataStore configuration should not fail with nil configuration. \(error)")
193223
}
194-
waitForExpectations(timeout: 1.0)
195224
}
196225

197226
func testStorageEngineQueryClearQuery() throws {
@@ -216,21 +245,35 @@ class AWSAPICategoryPluginTests: XCTestCase {
216245
dataStorePublisher: dataStorePublisher,
217246
validAPIPluginKey: "MockAPICategoryPlugin",
218247
validAuthPluginKey: "MockAuthCategoryPlugin")
248+
249+
let finishNotReceived = expectation(
250+
description: "publisher should not receive .finished completion event after clear() is called")
251+
finishNotReceived.isInverted = true
252+
219253
do {
220254
try plugin.configure(using: nil)
221-
XCTAssert(plugin.storageEngine == nil)
255+
XCTAssertNil(plugin.storageEngine)
256+
257+
let sink = plugin.publisher.sink { completion in
258+
switch completion {
259+
case .finished:
260+
finishNotReceived.fulfill()
261+
case .failure(let error):
262+
XCTFail("Error \(error)")
263+
}
264+
} receiveValue: { _ in }
222265

223266
let semaphore = DispatchSemaphore(value: 0)
224267
plugin.query(ExampleWithEveryType.self, completion: {_ in
225-
XCTAssert(plugin.storageEngine != nil)
226-
XCTAssert(plugin.dataStorePublisher != nil)
268+
XCTAssertNotNil(plugin.storageEngine)
269+
XCTAssertNotNil(plugin.dataStorePublisher)
227270
semaphore.signal()
228271
})
229272
semaphore.wait()
230273

231274
plugin.clear(completion: { _ in
232-
XCTAssert(plugin.storageEngine == nil)
233-
XCTAssert(plugin.dataStorePublisher == nil)
275+
XCTAssertNil(plugin.storageEngine)
276+
XCTAssertNotNil(plugin.dataStorePublisher)
234277
semaphore.signal()
235278
})
236279
semaphore.wait()
@@ -239,14 +282,14 @@ class AWSAPICategoryPluginTests: XCTestCase {
239282
}
240283

241284
plugin.query(ExampleWithEveryType.self, completion: { _ in
242-
XCTAssert(plugin.storageEngine != nil)
243-
XCTAssert(plugin.dataStorePublisher != nil)
285+
XCTAssertNotNil(plugin.storageEngine)
286+
XCTAssertNotNil(plugin.dataStorePublisher)
244287
})
245-
288+
waitForExpectations(timeout: 1.0)
289+
sink.cancel()
246290
} catch {
247291
XCTFail("DataStore configuration should not fail with nil configuration. \(error)")
248292
}
249-
waitForExpectations(timeout: 1.0)
250293
}
251294

252295
func expect(_ expectation: XCTestExpectation, _ currCount: Int, _ expectedCount: Int) -> Int {
@@ -256,4 +299,163 @@ class AWSAPICategoryPluginTests: XCTestCase {
256299
}
257300
return count
258301
}
302+
303+
/// - Given: Datastore plugin is initialized
304+
/// - When:
305+
/// - plugin.start() is called
306+
/// - plugin.clear() is called
307+
/// - a mutation event is sent
308+
/// - Then: The subscriber to plugin's publisher should receive the mutation
309+
310+
func testStorageEngineStartClearSend() {
311+
let startExpectation = expectation(description: "Start Sync should be called with start")
312+
let clearExpectation = expectation(description: "Clear should be called")
313+
314+
var count = 0
315+
let storageEngine = MockStorageEngineBehavior()
316+
storageEngine.responders[.startSync] = StartSyncResponder { _ in
317+
count = self.expect(startExpectation, count, 1)
318+
}
319+
storageEngine.responders[.clear] = ClearResponder { _ in
320+
count = self.expect(clearExpectation, count, 2)
321+
}
322+
323+
let storageEngineBehaviorFactory: StorageEngineBehaviorFactory = {_, _, _, _, _, _ throws in
324+
return storageEngine
325+
}
326+
let dataStorePublisher = DataStorePublisher()
327+
let plugin = AWSDataStorePlugin(modelRegistration: TestModelRegistration(),
328+
storageEngineBehaviorFactory: storageEngineBehaviorFactory,
329+
dataStorePublisher: dataStorePublisher,
330+
validAPIPluginKey: "MockAPICategoryPlugin",
331+
validAuthPluginKey: "MockAuthCategoryPlugin")
332+
333+
let finishNotReceived = expectation(
334+
description: "publisher should not receive .finished completion event after clear() is called")
335+
finishNotReceived.isInverted = true
336+
337+
let publisherReceivedValue = expectation(
338+
description: "publisher should receive a value when mutation event is sent")
339+
340+
do {
341+
try plugin.configure(using: nil)
342+
XCTAssertNil(plugin.storageEngine)
343+
344+
let sink = plugin.publisher.sink { completion in
345+
switch completion {
346+
case .finished:
347+
finishNotReceived.fulfill()
348+
case .failure(let error):
349+
XCTFail("Error \(error)")
350+
}
351+
} receiveValue: { event in
352+
XCTAssertEqual(event.modelId, "12345")
353+
publisherReceivedValue.fulfill()
354+
}
355+
356+
let semaphore = DispatchSemaphore(value: 0)
357+
plugin.start(completion: {_ in
358+
XCTAssertNotNil(plugin.storageEngine)
359+
XCTAssertNotNil(plugin.dataStorePublisher)
360+
semaphore.signal()
361+
})
362+
semaphore.wait()
363+
364+
plugin.clear(completion: { _ in
365+
XCTAssertNil(plugin.storageEngine)
366+
XCTAssertNotNil(plugin.dataStorePublisher)
367+
semaphore.signal()
368+
})
369+
semaphore.wait()
370+
371+
let mockModel = MockSynced(id: "12345")
372+
try plugin.dataStorePublisher?.send(input: MutationEvent(model: mockModel,
373+
modelSchema: mockModel.schema,
374+
mutationType: .create))
375+
376+
waitForExpectations(timeout: 1.0)
377+
sink.cancel()
378+
} catch {
379+
XCTFail("DataStore configuration should not fail with nil configuration. \(error)")
380+
}
381+
}
382+
383+
/// - Given: Datastore plugin is initialized
384+
/// - When:
385+
/// - plugin.start() is called
386+
/// - plugin.stop() is called
387+
/// - a mutation event is sent
388+
/// - Then: The subscriber to plugin's publisher should receive the mutation
389+
func testStorageEngineStartStopSend() {
390+
let startExpectation = expectation(description: "Start Sync should be called with start")
391+
let stopExpectation = expectation(description: "Stop should be called")
392+
393+
var count = 0
394+
let storageEngine = MockStorageEngineBehavior()
395+
storageEngine.responders[.startSync] = StartSyncResponder { _ in
396+
count = self.expect(startExpectation, count, 1)
397+
}
398+
storageEngine.responders[.stopSync] = StopSyncResponder { _ in
399+
count = self.expect(stopExpectation, count, 2)
400+
}
401+
402+
let storageEngineBehaviorFactory: StorageEngineBehaviorFactory = {_, _, _, _, _, _ throws in
403+
return storageEngine
404+
}
405+
let dataStorePublisher = DataStorePublisher()
406+
let plugin = AWSDataStorePlugin(modelRegistration: TestModelRegistration(),
407+
storageEngineBehaviorFactory: storageEngineBehaviorFactory,
408+
dataStorePublisher: dataStorePublisher,
409+
validAPIPluginKey: "MockAPICategoryPlugin",
410+
validAuthPluginKey: "MockAuthCategoryPlugin")
411+
412+
let finishNotReceived = expectation(
413+
description: "publisher should not receive .finished completion event after stop() is called")
414+
finishNotReceived.isInverted = true
415+
416+
let publisherReceivedValue = expectation(
417+
description: "publisher should receive a value when mutation event is sent")
418+
419+
do {
420+
try plugin.configure(using: nil)
421+
XCTAssertNil(plugin.storageEngine)
422+
423+
let sink = plugin.publisher.sink { completion in
424+
switch completion {
425+
case .finished:
426+
finishNotReceived.fulfill()
427+
case .failure(let error):
428+
XCTFail("Error \(error)")
429+
}
430+
} receiveValue: { event in
431+
XCTAssertEqual(event.modelId, "12345")
432+
publisherReceivedValue.fulfill()
433+
}
434+
435+
let semaphore = DispatchSemaphore(value: 0)
436+
plugin.start(completion: {_ in
437+
XCTAssertNotNil(plugin.storageEngine)
438+
XCTAssertNotNil(plugin.dataStorePublisher)
439+
semaphore.signal()
440+
})
441+
semaphore.wait()
442+
443+
plugin.stop(completion: { _ in
444+
XCTAssertNil(plugin.storageEngine)
445+
XCTAssertNotNil(plugin.dataStorePublisher)
446+
semaphore.signal()
447+
})
448+
semaphore.wait()
449+
450+
let mockModel = MockSynced(id: "12345")
451+
try plugin.dataStorePublisher?.send(input: MutationEvent(model: mockModel,
452+
modelSchema: mockModel.schema,
453+
mutationType: .create))
454+
455+
waitForExpectations(timeout: 1.0)
456+
sink.cancel()
457+
} catch {
458+
XCTFail("DataStore configuration should not fail with nil configuration. \(error)")
459+
}
460+
}
259461
}

0 commit comments

Comments
 (0)