Skip to content

Commit 2c4f85a

Browse files
authored
fix(datastore-v1): observeQuery snapshot on .modelSynced event (#2367)
* fix(datastore): observeQuery snapshot on .modelSynced event * chore(datastore): fix integ tests on DataStoreObserveQueryTests * chore(datastore-v1): address PR comments
1 parent 98ef362 commit 2c4f85a

File tree

4 files changed

+200
-11
lines changed

4 files changed

+200
-11
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/AWSDataStorePlugin.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,10 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
236236
dataStorePublisher.send(input: mutationEvent)
237237
case .modelSyncedEvent(let modelSyncedEvent):
238238
log.verbose("Emitting DataStore event: modelSyncedEvent \(modelSyncedEvent)")
239+
dispatchedModelSyncedEvents[modelSyncedEvent.modelName]?.set(true)
239240
let modelSyncedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
240241
data: modelSyncedEvent)
241242
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
242-
dispatchedModelSyncedEvents[modelSyncedEvent.modelName]?.set(true)
243243
case .syncQueriesReadyEvent:
244244
log.verbose("[Lifecycle event 4]: syncQueriesReady")
245245
let syncQueriesReadyEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.syncQueriesReady)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Subscribe/DataStoreObserveQueryOperation.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
131131
var currentItems: SortedList<M>
132132
var batchItemsChangedSink: AnyCancellable?
133133
var itemsChangedSink: AnyCancellable?
134+
var modelSyncedEventSink: AnyCancellable?
134135

135136
/// Internal publisher for `ObserveQueryPublisher` to pass events back to subscribers
136137
let passthroughPublisher: PassthroughSubject<DataStoreQuerySnapshot<M>, DataStoreError>
@@ -182,6 +183,10 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
182183
if let batchItemsChangedSink = batchItemsChangedSink {
183184
batchItemsChangedSink.cancel()
184185
}
186+
187+
if let modelSyncedEventSink = modelSyncedEventSink {
188+
modelSyncedEventSink.cancel()
189+
}
185190
passthroughPublisher.send(completion: .finished)
186191
super.cancel()
187192
finish()
@@ -198,6 +203,7 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
198203
self.currentItems.reset()
199204
self.itemsChangedSink = nil
200205
self.batchItemsChangedSink = nil
206+
self.modelSyncedEventSink = nil
201207
}
202208
}
203209

@@ -250,6 +256,7 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
250256
switch queryResult {
251257
case .success(let queriedModels):
252258
currentItems.set(sortedModels: queriedModels)
259+
subscribeToModelSyncedEvent()
253260
sendSnapshot()
254261
case .failure(let error):
255262
self.passthroughPublisher.send(completion: .failure(error))
@@ -284,6 +291,18 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
284291
receiveValue: onItemChangeAfterSync(mutationEvent:))
285292
}
286293

294+
func subscribeToModelSyncedEvent() {
295+
modelSyncedEventSink = Amplify.Hub.publisher(for: .dataStore).sink { event in
296+
if event.eventName == HubPayload.EventName.DataStore.modelSynced,
297+
let modelSyncedEvent = event.data as? ModelSyncedEvent,
298+
modelSyncedEvent.modelName == self.modelSchema.name {
299+
self.serialQueue.async {
300+
self.sendSnapshot()
301+
}
302+
}
303+
}
304+
}
305+
287306
func filterByModelName(mutationEvent: MutationEvent) -> Bool {
288307
// Filter in the model when it matches the model name for this operation
289308
mutationEvent.modelName == modelSchema.name

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreObserveQueryTests.swift

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,50 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase {
6666
sink.cancel()
6767
}
6868

69+
/// ObserveQuery API will eventually return a snapshot when sync state is toggled from false to true.
70+
/// A `.modelSynced` event from the hub is internally received
71+
///
72+
/// - Given: DataStore is cleared
73+
/// - When:
74+
/// - ObserveQuery API is called to start the sync engine
75+
/// - A model is saved but not yet synced
76+
/// - Then:
77+
/// - A query snapshot is received on `.modelSynced`
78+
///
79+
func testObserveQueryWhenModelSyncedEvent() throws {
80+
setUp(withModels: TestModelRegistration())
81+
try startAmplify()
82+
clearDataStore()
83+
var snapshots = [DataStoreQuerySnapshot<Post>]()
84+
var isObserveQueryReadyForTest = false
85+
let observeQueryReadyForTest = expectation(description: "received query snapshot with .isSynced true")
86+
let snapshotWithPost = expectation(description: "received first snapshot")
87+
let post = Post(title: "title", content: "content", createdAt: .now())
88+
let sink = Amplify.DataStore.observeQuery(for: Post.self).sink { completed in
89+
switch completed {
90+
case .finished:
91+
break
92+
case .failure(let error):
93+
XCTFail("\(error)")
94+
}
95+
} receiveValue: { querySnapshot in
96+
snapshots.append(querySnapshot)
97+
if !isObserveQueryReadyForTest && querySnapshot.isSynced {
98+
isObserveQueryReadyForTest = true
99+
observeQueryReadyForTest.fulfill()
100+
}
101+
if querySnapshot.items.contains(where: { $0.id == post.id }) {
102+
snapshotWithPost.fulfill()
103+
}
104+
}
105+
106+
wait(for: [observeQueryReadyForTest], timeout: 100)
107+
108+
_ = Amplify.DataStore.save(post)
109+
wait(for: [snapshotWithPost], timeout: 100)
110+
sink.cancel()
111+
}
112+
69113
/// Apply a query predicate "title begins with 'xyz'"
70114
///
71115
/// - Given: DataStore is set up with an empty local store
@@ -83,6 +127,7 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase {
83127
clearDataStore()
84128
var snapshots = [DataStoreQuerySnapshot<Post>]()
85129
let snapshotWithIsSynced = expectation(description: "query snapshot with isSynced true")
130+
let receivedPostFromObserveQuery = expectation(description: "received Post")
86131
snapshotWithIsSynced.assertForOverFulfill = false
87132
var snapshotWithIsSyncedFulfilled = false
88133
let predicate = Post.keys.title.beginsWith("xyz")
@@ -94,18 +139,21 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase {
94139
XCTFail("\(error)")
95140
}
96141
} receiveValue: { querySnapshot in
97-
if !snapshotWithIsSyncedFulfilled {
98-
snapshots.append(querySnapshot)
99-
100-
if querySnapshot.isSynced {
101-
snapshotWithIsSyncedFulfilled = true
102-
snapshotWithIsSynced.fulfill()
142+
snapshots.append(querySnapshot)
143+
if !snapshotWithIsSyncedFulfilled && querySnapshot.isSynced {
144+
snapshotWithIsSyncedFulfilled = true
145+
snapshotWithIsSynced.fulfill()
146+
} else if snapshotWithIsSyncedFulfilled {
147+
if querySnapshot.items.count >= 4 && querySnapshot.items.contains(where: { post in
148+
post.title.contains("xyz")
149+
}) {
150+
receivedPostFromObserveQuery.fulfill()
103151
}
104152
}
105153
}
106154

107-
savePostAndWaitForSync(Post(title: "xyz", content: "content", createdAt: .now()))
108-
wait(for: [snapshotWithIsSynced], timeout: 100)
155+
savePostAndWaitForSync(Post(title: "xyz 4", content: "content", createdAt: .now()))
156+
wait(for: [snapshotWithIsSynced, receivedPostFromObserveQuery], timeout: 200)
109157
XCTAssertTrue(snapshots.count >= 2)
110158
XCTAssertFalse(snapshots[0].isSynced)
111159
log.info("\(snapshots)")
@@ -213,7 +261,7 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase {
213261
/// - Delete a model that does NOT match the predicate. No snapshot is emitted
214262
func testPredicateWithCreateUpdateDelete() throws {
215263
setUp(withModels: TestModelRegistration(), logLevel: .info)
216-
try startAmplify()
264+
try startAmplifyAndWaitForReady()
217265

218266
let testId = UUID().uuidString
219267
let postMatchPredicate = Post(title: "xyz 1", content: testId, createdAt: .now())
@@ -307,7 +355,7 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase {
307355
///
308356
func testSortWithCreateUpdateDelete() throws {
309357
setUp(withModels: TestModelRegistration(), logLevel: .info)
310-
try startAmplify()
358+
try startAmplifyAndWaitForReady()
311359

312360
let testId = UUID().uuidString
313361
var snapshotCount = 0

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Subscribe/DataStoreObserveQueryOperationTests.swift

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,128 @@ class DataStoreObserveQueryOperationTests: XCTestCase {
7777
sink.cancel()
7878
}
7979

80+
/// ObserveQuery will send a single snapshot when the sync state toggles
81+
/// from false to true. The operation internally listens to `.modelSynced` event from
82+
/// the Hub.
83+
///
84+
/// - Given: ObserveQuery has started and the first snapshot has been received.
85+
/// - When:
86+
/// - modelSyncedEvent is sent to the Hub
87+
/// - Then:
88+
/// - ObserveQuery will send a second snapshot
89+
///
90+
func testGenerateSnapshotOnObserveQueryWhenModelSynced() throws {
91+
let firstSnapshot = expectation(description: "first query snapshots")
92+
let secondSnapshot = expectation(description: "second query snapshots")
93+
let thirdSnapshot = expectation(description: "third query snapshot")
94+
thirdSnapshot.isInverted = true
95+
96+
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
97+
let dispatchedModelSyncedEvent = AtomicValue(initialValue: false)
98+
let operation = AWSDataStoreObserveQueryOperation(
99+
modelType: Post.self,
100+
modelSchema: Post.schema,
101+
predicate: nil,
102+
sortInput: nil,
103+
storageEngine: storageEngine,
104+
dataStorePublisher: dataStorePublisher,
105+
dataStoreConfiguration: .default,
106+
dispatchedModelSyncedEvent: dispatchedModelSyncedEvent)
107+
108+
let sink = operation.publisher.sink { completed in
109+
switch completed {
110+
case .finished:
111+
break
112+
case .failure(let error):
113+
XCTFail("Failed with error \(error)")
114+
}
115+
} receiveValue: { querySnapshot in
116+
querySnapshots.append(querySnapshot)
117+
if querySnapshots.count == 1 {
118+
firstSnapshot.fulfill()
119+
} else if querySnapshots.count == 2 {
120+
secondSnapshot.fulfill()
121+
} else if querySnapshots.count == 3 {
122+
XCTFail("Should not receive third snapshot for a Model change")
123+
thirdSnapshot.fulfill()
124+
}
125+
}
126+
let queue = OperationQueue()
127+
queue.addOperation(operation)
128+
wait(for: [firstSnapshot], timeout: 5)
129+
130+
dispatchedModelSyncedEvent.set(true)
131+
var modelSyncedEventBuilder = ModelSyncedEvent.Builder()
132+
modelSyncedEventBuilder.modelName = Post.modelName
133+
let modelSyncedEvent = modelSyncedEventBuilder.build()
134+
let modelSyncedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
135+
data: modelSyncedEvent)
136+
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
137+
wait(for: [secondSnapshot], timeout: 10)
138+
139+
modelSyncedEventBuilder.modelName = "modelNameNotMatch"
140+
let modelSyncedEventNotMatch = modelSyncedEventBuilder.build()
141+
let modelSyncedEventNotMatchPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
142+
data: modelSyncedEventNotMatch)
143+
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventNotMatchPayload)
144+
wait(for: [thirdSnapshot], timeout: 10)
145+
146+
XCTAssertEqual(querySnapshots[0].items.count, 0)
147+
XCTAssertEqual(querySnapshots[0].isSynced, false)
148+
XCTAssertEqual(querySnapshots[1].isSynced, true)
149+
sink.cancel()
150+
}
151+
152+
/// ObserveQuery will send the first snapshot with 2 items when storage engine
153+
/// is mocked to return 2 items.
154+
///
155+
/// - Given: ObserveQuery starts
156+
/// - When:
157+
/// - ObserveQuery performs the initial query, two posts are queried through StorageEngine
158+
/// - Then:
159+
/// - The items queried will return two posts in the first snapshot
160+
///
161+
func testFirstSnapshotFromStorageQueryReturnsTwoPosts() throws {
162+
let firstSnapshot = expectation(description: "firstSnapshot received")
163+
164+
var snapshots = [DataStoreQuerySnapshot<Post>]()
165+
let dispatchedModelSyncedEvent = AtomicValue(initialValue: false)
166+
let operation = AWSDataStoreObserveQueryOperation(
167+
modelType: Post.self,
168+
modelSchema: Post.schema,
169+
predicate: nil,
170+
sortInput: nil,
171+
storageEngine: storageEngine,
172+
dataStorePublisher: dataStorePublisher,
173+
dataStoreConfiguration: .default,
174+
dispatchedModelSyncedEvent: dispatchedModelSyncedEvent)
175+
let post = Post(title: "model1",
176+
content: "content1",
177+
createdAt: .now())
178+
storageEngine.responders[.query] = QueryResponder<Post>(callback: { _ in
179+
return .success([post, post])
180+
})
181+
182+
let sink = operation.publisher.sink { completed in
183+
switch completed {
184+
case .finished:
185+
break
186+
case .failure(let error):
187+
XCTFail("Failed with error \(error)")
188+
}
189+
} receiveValue: { querySnapshot in
190+
snapshots.append(querySnapshot)
191+
if snapshots.count == 1 {
192+
XCTAssertEqual(querySnapshot.items.count, 2)
193+
firstSnapshot.fulfill()
194+
}
195+
}
196+
let queue = OperationQueue()
197+
queue.addOperation(operation)
198+
wait(for: [firstSnapshot], timeout: 10)
199+
sink.cancel()
200+
}
201+
80202
/// Multiple item changed observed will be returned in a single snapshot
81203
///
82204
/// - Given: The operation has started and the first query has completed.

0 commit comments

Comments
 (0)