@@ -45,6 +45,15 @@ import {
4545import { BloomFilter , BloomFilterError } from './bloom_filter' ;
4646import { ExistenceFilter } from './existence_filter' ;
4747import { RemoteEvent , TargetChange } from './remote_event' ;
48+ import {
49+ getPipelineDocuments ,
50+ getPipelineFlavor ,
51+ getPipelineSourceType ,
52+ isPipeline ,
53+ TargetOrPipeline
54+ } from '../core/pipeline-util' ;
55+ import { Pipeline } from '../lite-api/pipeline' ;
56+ import { ResourcePath } from '../model/path' ;
4857
4958/**
5059 * Internal representation of the watcher API protocol buffers.
@@ -405,6 +414,17 @@ export class WatchChangeAggregator {
405414 }
406415 }
407416
417+ isSingleDocumentTarget ( target : TargetOrPipeline ) : boolean {
418+ if ( targetIsPipelineTarget ( target ) ) {
419+ return (
420+ getPipelineSourceType ( target ) === 'documents' &&
421+ getPipelineDocuments ( target ) ?. length === 1
422+ ) ;
423+ }
424+
425+ return targetIsDocumentTarget ( target ) ;
426+ }
427+
408428 /**
409429 * Handles existence filters and synthesizes deletes for filter mismatches.
410430 * Targets that are invalidated by filter mismatches are added to
@@ -417,29 +437,7 @@ export class WatchChangeAggregator {
417437 const targetData = this . targetDataForActiveTarget ( targetId ) ;
418438 if ( targetData ) {
419439 const target = targetData . target ;
420- if ( targetIsPipelineTarget ( target ) ) {
421- //TODO(pipeline): handle existence filter correctly for pipelines
422- } else if ( targetIsDocumentTarget ( target ) ) {
423- if ( expectedCount === 0 ) {
424- // The existence filter told us the document does not exist. We deduce
425- // that this document does not exist and apply a deleted document to
426- // our updates. Without applying this deleted document there might be
427- // another query that will raise this document as part of a snapshot
428- // until it is resolved, essentially exposing inconsistency between
429- // queries.
430- const key = new DocumentKey ( target . path ) ;
431- this . removeDocumentFromTarget (
432- targetId ,
433- key ,
434- MutableDocument . newNoDocument ( key , SnapshotVersion . min ( ) )
435- ) ;
436- } else {
437- hardAssert (
438- expectedCount === 1 ,
439- 'Single document existence filter with count: ' + expectedCount
440- ) ;
441- }
442- } else {
440+ if ( ! this . isSingleDocumentTarget ( target ) ) {
443441 const currentSize = this . getCurrentDocumentCountForTarget ( targetId ) ;
444442 // Existence filter mismatch. Mark the documents as being in limbo, and
445443 // raise a snapshot with `isFromCache:true`.
@@ -474,6 +472,30 @@ export class WatchChangeAggregator {
474472 )
475473 ) ;
476474 }
475+ } else {
476+ if ( expectedCount === 0 ) {
477+ // The existence filter told us the document does not exist. We deduce
478+ // that this document does not exist and apply a deleted document to
479+ // our updates. Without applying this deleted document there might be
480+ // another query that will raise this document as part of a snapshot
481+ // until it is resolved, essentially exposing inconsistency between
482+ // queries.
483+ const key = new DocumentKey (
484+ targetIsPipelineTarget ( target )
485+ ? ResourcePath . fromString ( getPipelineDocuments ( target ) ! [ 0 ] )
486+ : target . path
487+ ) ;
488+ this . removeDocumentFromTarget (
489+ targetId ,
490+ key ,
491+ MutableDocument . newNoDocument ( key , SnapshotVersion . min ( ) )
492+ ) ;
493+ } else {
494+ hardAssert (
495+ expectedCount === 1 ,
496+ 'Single document existence filter with count: ' + expectedCount
497+ ) ;
498+ }
477499 }
478500 }
479501 }
@@ -591,8 +613,7 @@ export class WatchChangeAggregator {
591613 if ( targetData ) {
592614 if (
593615 targetState . current &&
594- ! targetIsPipelineTarget ( targetData . target ) &&
595- targetIsDocumentTarget ( targetData . target )
616+ this . isSingleDocumentTarget ( targetData . target )
596617 ) {
597618 // Document queries for document that don't exist can produce an empty
598619 // result set. To update our local cache, we synthesize a document
@@ -603,7 +624,12 @@ export class WatchChangeAggregator {
603624 // TODO(dimond): Ideally we would have an explicit lookup target
604625 // instead resulting in an explicit delete message and we could
605626 // remove this special logic.
606- const key = new DocumentKey ( targetData . target . path ) ;
627+ const path = targetIsPipelineTarget ( targetData . target )
628+ ? ResourcePath . fromString (
629+ getPipelineDocuments ( targetData . target ) ! [ 0 ]
630+ )
631+ : targetData . target . path ;
632+ const key = new DocumentKey ( path ) ;
607633 if (
608634 this . pendingDocumentUpdates . get ( key ) === null &&
609635 ! this . targetContainsDocument ( targetId , key )
@@ -695,7 +721,12 @@ export class WatchChangeAggregator {
695721 targetState . addDocumentChange ( document . key , changeType ) ;
696722
697723 if (
698- targetIsPipelineTarget ( this . targetDataForActiveTarget ( targetId ) ! . target )
724+ targetIsPipelineTarget (
725+ this . targetDataForActiveTarget ( targetId ) ! . target
726+ ) &&
727+ getPipelineFlavor (
728+ this . targetDataForActiveTarget ( targetId ) ! . target as Pipeline
729+ ) !== 'exact'
699730 ) {
700731 this . pendingAugmentedDocumentUpdates =
701732 this . pendingAugmentedDocumentUpdates . insert ( document . key , document ) ;
@@ -747,7 +778,12 @@ export class WatchChangeAggregator {
747778
748779 if ( updatedDocument ) {
749780 if (
750- targetIsPipelineTarget ( this . targetDataForActiveTarget ( targetId ) ! . target )
781+ targetIsPipelineTarget (
782+ this . targetDataForActiveTarget ( targetId ) ! . target
783+ ) &&
784+ getPipelineFlavor (
785+ this . targetDataForActiveTarget ( targetId ) ! . target as Pipeline
786+ ) !== 'exact'
751787 ) {
752788 this . pendingAugmentedDocumentUpdates =
753789 this . pendingAugmentedDocumentUpdates . insert ( key , updatedDocument ) ;
0 commit comments