@@ -234,9 +234,7 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
234
234
finish ( )
235
235
return
236
236
}
237
- if log. logLevel >= . debug {
238
- stopwatch. start ( )
239
- }
237
+ startSnapshotStopWatch ( )
240
238
storageEngine. query (
241
239
modelType,
242
240
modelSchema: modelSchema,
@@ -252,7 +250,7 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
252
250
switch queryResult {
253
251
case . success( let queriedModels) :
254
252
currentItems. sortedModels = queriedModels
255
- generateQuerySnapshot ( )
253
+ sendSnapshot ( )
256
254
case . failure( let error) :
257
255
self . passthroughPublisher. send ( completion: . failure( error) )
258
256
self . finish ( )
@@ -263,45 +261,50 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
263
261
264
262
// MARK: Observe item changes
265
263
264
+ /// Subscribe to item changes with two subscribers (During Sync and After Sync). During Sync, the items are filtered
265
+ /// by name and predicate, then collected by the timeOrCount grouping, before sent for processing the snapshot.
266
+ /// After Sync, the item is only filtered by name, and not the predicate filter because updates to the item may
267
+ /// make it so that the item no longer matches the predicate and requires to be removed from `currentItems`.
268
+ /// This check is defered until `onItemChangedAfterSync` where the predicate is then used, and `currentItems` is
269
+ /// accessed under the serial queue.
266
270
func subscribeToItemChanges( ) {
267
271
batchItemsChangedSink = dataStorePublisher. publisher
268
272
. filter { _ in !self . dispatchedModelSyncedEvent. get ( ) }
269
- . filter ( onItemChangedFilter ( mutationEvent: ) )
273
+ . filter ( filterByModelName ( mutationEvent: ) )
274
+ . filter ( filterByPredicateMatch ( mutationEvent: ) )
270
275
. collect ( . byTimeOrCount( serialQueue, itemsChangedPeriodicPublishTimeInSeconds, itemsChangedMaxSize) )
271
276
. sink ( receiveCompletion: onReceiveCompletion ( completed: ) ,
272
- receiveValue: onItemsChange ( mutationEvents: ) )
277
+ receiveValue: onItemsChangeDuringSync ( mutationEvents: ) )
273
278
274
279
itemsChangedSink = dataStorePublisher. publisher
275
280
. filter { _ in self . dispatchedModelSyncedEvent. get ( ) }
276
- . filter ( onItemChangedFilter ( mutationEvent: ) )
281
+ . filter ( filterByModelName ( mutationEvent: ) )
277
282
. receive ( on: serialQueue)
278
283
. sink ( receiveCompletion: onReceiveCompletion ( completed: ) ,
279
- receiveValue: onItemChange ( mutationEvent: ) )
284
+ receiveValue: onItemChangeAfterSync ( mutationEvent: ) )
280
285
}
281
286
282
- func onItemChangedFilter ( mutationEvent: MutationEvent ) -> Bool {
283
- guard mutationEvent . modelName == modelSchema . name else {
284
- return false
285
- }
287
+ func filterByModelName ( mutationEvent: MutationEvent ) -> Bool {
288
+ // Filter in the model when it matches the model name for this operation
289
+ mutationEvent . modelName == modelSchema . name
290
+ }
286
291
292
+ func filterByPredicateMatch( mutationEvent: MutationEvent ) -> Bool {
293
+ // Filter in the model when there is no predicate to check against.
287
294
guard let predicate = self . predicate else {
288
295
return true
289
296
}
290
-
291
297
do {
292
298
let model = try mutationEvent. decodeModel ( as: modelType)
299
+ // Filter in the model when the predicate matches, otherwise filter out
293
300
return predicate. evaluate ( target: model)
294
301
} catch {
295
302
log. error ( error: error)
296
303
return false
297
304
}
298
305
}
299
306
300
- func onItemChange( mutationEvent: MutationEvent ) {
301
- onItemsChange ( mutationEvents: [ mutationEvent] )
302
- }
303
-
304
- func onItemsChange( mutationEvents: [ MutationEvent ] ) {
307
+ func onItemsChangeDuringSync( mutationEvents: [ MutationEvent ] ) {
305
308
serialQueue. async {
306
309
if self . isCancelled || self . isFinished {
307
310
self . finish ( )
@@ -310,31 +313,77 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
310
313
guard self . observeQueryStarted, !mutationEvents. isEmpty else {
311
314
return
312
315
}
313
- if self . log . logLevel >= . debug {
314
- self . stopwatch . start ( )
315
- }
316
- self . generateQuerySnapshot ( itemsChanged : mutationEvents )
316
+
317
+ self . startSnapshotStopWatch ( )
318
+ self . apply ( itemsChanged : mutationEvents )
319
+ self . sendSnapshot ( )
317
320
}
318
321
}
319
322
320
- private func generateQuerySnapshot( itemsChanged: [ MutationEvent ] = [ ] ) {
321
- updateCurrentItems ( with: itemsChanged)
322
- passthroughPublisher. send ( currentSnapshot)
323
- if log. logLevel >= . debug {
324
- let time = stopwatch. stop ( )
325
- log. debug ( " Time to generate snapshot: \( time) seconds " )
323
+ // Item changes after sync is more elaborate than item changes during sync because the item was never filtered out
324
+ // by the predicate (unlike during sync). An item that no longer matches the predicate may already exist in the
325
+ // snapshot and now needs to be removed. The evaluation is done here under the serial queue since checking to
326
+ // remove the item requires that check on `currentItems` and is required to be performed under the serial queue.
327
+ func onItemChangeAfterSync( mutationEvent: MutationEvent ) {
328
+ serialQueue. async {
329
+ if self . isCancelled || self . isFinished {
330
+ self . finish ( )
331
+ return
332
+ }
333
+ guard self . observeQueryStarted else {
334
+ return
335
+ }
336
+ self . startSnapshotStopWatch ( )
337
+
338
+ do {
339
+ let model = try mutationEvent. decodeModel ( as: self . modelType)
340
+ guard let mutationType = MutationEvent . MutationType ( rawValue: mutationEvent. mutationType) else {
341
+ return
342
+ }
343
+
344
+ guard let predicate = self . predicate else {
345
+ // 1. If there is no predicate, this item should be applied to the snapshot
346
+ if self . currentItems. apply ( model: model, mutationType: mutationType) {
347
+ self . sendSnapshot ( )
348
+ }
349
+ return
350
+ }
351
+
352
+ // 2. When there is a predicate, evaluate further
353
+ let modelMatchesPredicate = predicate. evaluate ( target: model)
354
+
355
+ guard !modelMatchesPredicate else {
356
+ // 3. When the item matchs the predicate, the item should be applied to the snapshot
357
+ if self . currentItems. apply ( model: model, mutationType: mutationType) {
358
+ self . sendSnapshot ( )
359
+ }
360
+ return
361
+ }
362
+
363
+ // 4. When the item does not match the predicate, and is an update/delete, then the item needs to be
364
+ // removed from `currentItems` because it no longer should be in the snapshot. If removing it was
365
+ // was successfully, then send a new snapshot
366
+ if mutationType == . update || mutationType == . delete, self . currentItems. remove ( model) {
367
+ self . sendSnapshot ( )
368
+ }
369
+ } catch {
370
+ self . log. error ( error: error)
371
+ return
372
+ }
373
+
326
374
}
327
375
}
328
376
329
377
/// Update `curentItems` list with the changed items.
330
378
/// This method is not thread safe unless executed under the serial DispatchQueue `serialQueue`.
331
- private func updateCurrentItems ( with itemsChanged: [ MutationEvent ] ) {
379
+ private func apply ( itemsChanged: [ MutationEvent ] ) {
332
380
for item in itemsChanged {
333
381
do {
334
382
let model = try item. decodeModel ( as: modelType)
335
383
guard let mutationType = MutationEvent . MutationType ( rawValue: item. mutationType) else {
336
384
return
337
385
}
386
+
338
387
currentItems. apply ( model: model, mutationType: mutationType)
339
388
} catch {
340
389
log. error ( error: error)
@@ -343,6 +392,20 @@ public class AWSDataStoreObserveQueryOperation<M: Model>: AsynchronousOperation,
343
392
}
344
393
}
345
394
395
+ private func startSnapshotStopWatch( ) {
396
+ if log. logLevel >= . debug {
397
+ stopwatch. start ( )
398
+ }
399
+ }
400
+
401
+ private func sendSnapshot( ) {
402
+ passthroughPublisher. send ( currentSnapshot)
403
+ if log. logLevel >= . debug {
404
+ let time = stopwatch. stop ( )
405
+ log. debug ( " Time to generate snapshot: \( time) seconds " )
406
+ }
407
+ }
408
+
346
409
private func onReceiveCompletion( completed: Subscribers . Completion < DataStoreError > ) {
347
410
if isCancelled || isFinished {
348
411
finish ( )
0 commit comments