Skip to content

Commit 1994e14

Browse files
authored
fix(datastore): observequery local only collect (#3214)
1 parent d3af5f9 commit 1994e14

File tree

4 files changed

+26
-1
lines changed

4 files changed

+26
-1
lines changed

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ extension StorageEngine {
7474
}
7575
}
7676

77+
/// Expresses whether the `StorageEngine` syncs from a remote source
78+
/// based on whether the `AWSAPIPlugin` is present.
79+
var syncsFromRemote: Bool {
80+
tryGetAPIPlugin() != nil
81+
}
82+
7783
private func tryGetAPIPlugin() -> APICategoryPlugin? {
7884
do {
7985
return try Amplify.API.getPlugin(for: validAPIPluginKey)

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineBehavior.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,7 @@ protocol StorageEngineBehavior: AnyObject, ModelStorageBehavior {
3131
func startSync() -> Result<SyncEngineInitResult, DataStoreError>
3232
func stopSync(completion: @escaping DataStoreCallback<Void>)
3333
func clear(completion: @escaping DataStoreCallback<Void>)
34+
35+
/// expresses whether the conforming type is syncing from a remote source.
36+
var syncsFromRemote: Bool { get }
3437
}

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Subscribe/DataStoreObserveQueryOperation.swift

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,26 @@ class ObserveQueryTaskRunner<M: Model>: InternalTaskRunner, InternalTaskAsyncThr
222222
func subscribeToItemChanges() {
223223
serialQueue.async { [weak self] in
224224
guard let self = self else { return }
225+
225226
self.batchItemsChangedSink = self.dataStorePublisher.publisher
226227
.filter { _ in !self.dispatchedModelSyncedEvent.get() }
227228
.filter(self.filterByModelName(mutationEvent:))
228229
.filter(self.filterByPredicateMatch(mutationEvent:))
229230
.handleEvents(receiveOutput: self.onItemChangeDuringSync(mutationEvent:) )
230-
.collect(.byTimeOrCount(self.serialQueue, self.itemsChangedPeriodicPublishTimeInSeconds, self.itemsChangedMaxSize))
231+
.collect(
232+
.byTimeOrCount(
233+
// on queue
234+
self.serialQueue,
235+
// collect over this timeframe
236+
self.itemsChangedPeriodicPublishTimeInSeconds,
237+
// If the `storageEngine` does sync from remote, the initial batch should
238+
// collect snapshots based on time / snapshots received.
239+
// If it doesn't, it should publish each snapshot without waiting.
240+
self.storageEngine.syncsFromRemote
241+
? self.itemsChangedMaxSize
242+
: 1
243+
)
244+
)
231245
.sink(receiveCompletion: self.onReceiveCompletion(completed:),
232246
receiveValue: self.onItemsChangeDuringSync(mutationEvents:))
233247

AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,8 @@ class MockStorageEngineBehavior: StorageEngineBehavior {
301301

302302
}
303303

304+
var syncsFromRemote: Bool { true }
305+
304306
var mockSyncEnginePublisher: PassthroughSubject<RemoteSyncEngineEvent, DataStoreError>!
305307
var mockSyncEngineSubscription: AnyCancellable! {
306308
willSet {

0 commit comments

Comments
 (0)