@@ -27,6 +27,7 @@ import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
2727import { Pipeline } from '../api/pipeline' ;
2828import { PipelineSnapshot } from '../api/snapshot' ;
2929import { PipelineResultView } from './sync_engine_impl' ;
30+ import { canonifyPipeline , pipelineEq } from './pipeline-util' ;
3031
3132/**
3233 * Holds the listeners and the last received ViewSnapshot for a query being
@@ -50,6 +51,12 @@ export interface Observer<T> {
5051 error : EventHandler < FirestoreError > ;
5152}
5253
54+ export type QueryOrPipeline = Query | Pipeline ;
55+
56+ export function isPipeline ( q : QueryOrPipeline ) : q is Pipeline {
57+ return q instanceof Pipeline ;
58+ }
59+
5360/**
5461 * EventManager is responsible for mapping queries to query event emitters.
5562 * It handles "fan-out". -- Identical queries will re-use the same watch on the
@@ -61,14 +68,15 @@ export interface Observer<T> {
6168 */
6269export interface EventManager {
6370 onListen ?: (
64- query : Query ,
71+ query : QueryOrPipeline ,
6572 enableRemoteListen : boolean
6673 ) => Promise < ViewSnapshot > ;
67- onUnlisten ?: ( query : Query , disableRemoteListen : boolean ) => Promise < void > ;
68- onFirstRemoteStoreListen ?: ( query : Query ) => Promise < void > ;
69- onLastRemoteStoreUnlisten ?: ( query : Query ) => Promise < void > ;
70- // TODO(pipeline): consolidate query and pipeline
71- onListenPipeline ?: ( pipeline : PipelineListener ) => Promise < void > ;
74+ onUnlisten ?: (
75+ query : QueryOrPipeline ,
76+ disableRemoteListen : boolean
77+ ) => Promise < void > ;
78+ onFirstRemoteStoreListen ?: ( query : QueryOrPipeline ) => Promise < void > ;
79+ onLastRemoteStoreUnlisten ?: ( query : QueryOrPipeline ) => Promise < void > ;
7280 terminate ( ) : void ;
7381}
7482
@@ -77,31 +85,34 @@ export function newEventManager(): EventManager {
7785}
7886
7987export class EventManagerImpl implements EventManager {
80- queries : ObjectMap < Query , QueryListenersInfo > = newQueriesObjectMap ( ) ;
88+ queries : ObjectMap < QueryOrPipeline , QueryListenersInfo > =
89+ newQueriesObjectMap ( ) ;
8190
8291 onlineState : OnlineState = OnlineState . Unknown ;
8392
8493 snapshotsInSyncListeners : Set < Observer < void > > = new Set ( ) ;
8594
8695 /** Callback invoked when a Query is first listen to. */
8796 onListen ?: (
88- query : Query ,
97+ query : QueryOrPipeline ,
8998 enableRemoteListen : boolean
9099 ) => Promise < ViewSnapshot > ;
91100 /** Callback invoked once all listeners to a Query are removed. */
92- onUnlisten ?: ( query : Query , disableRemoteListen : boolean ) => Promise < void > ;
93- onListenPipeline ?: ( pipeline : PipelineListener ) => Promise < void > ;
101+ onUnlisten ?: (
102+ query : QueryOrPipeline ,
103+ disableRemoteListen : boolean
104+ ) => Promise < void > ;
94105
95106 /**
96107 * Callback invoked when a Query starts listening to the remote store, while
97108 * already listening to the cache.
98109 */
99- onFirstRemoteStoreListen ?: ( query : Query ) => Promise < void > ;
110+ onFirstRemoteStoreListen ?: ( query : QueryOrPipeline ) => Promise < void > ;
100111 /**
101112 * Callback invoked when a Query stops listening to the remote store, while
102113 * still listening to the cache.
103114 */
104- onLastRemoteStoreUnlisten ?: ( query : Query ) => Promise < void > ;
115+ onLastRemoteStoreUnlisten ?: ( query : QueryOrPipeline ) => Promise < void > ;
105116
106117 terminate ( ) : void {
107118 errorAllTargets (
@@ -111,10 +122,43 @@ export class EventManagerImpl implements EventManager {
111122 }
112123}
113124
114- function newQueriesObjectMap ( ) : ObjectMap < Query , QueryListenersInfo > {
115- return new ObjectMap < Query , QueryListenersInfo > (
116- q => canonifyQuery ( q ) ,
117- queryEquals
125+ export function stringifyQueryOrPipeline ( q : QueryOrPipeline ) : string {
126+ if ( isPipeline ( q ) ) {
127+ return canonifyPipeline ( q ) ;
128+ }
129+
130+ return stringifyQuery ( q ) ;
131+ }
132+
133+ export function canonifyQueryOrPipeline ( q : QueryOrPipeline ) : string {
134+ if ( isPipeline ( q ) ) {
135+ return canonifyPipeline ( q ) ;
136+ }
137+
138+ return canonifyQuery ( q ) ;
139+ }
140+
141+ export function queryOrPipelineEqual (
142+ left : QueryOrPipeline ,
143+ right : QueryOrPipeline
144+ ) : boolean {
145+ if ( left instanceof Pipeline && right instanceof Pipeline ) {
146+ return pipelineEq ( left , right ) ;
147+ }
148+ if (
149+ ( left instanceof Pipeline && ! ( right instanceof Pipeline ) ) ||
150+ ( ! ( left instanceof Pipeline ) && right instanceof Pipeline )
151+ ) {
152+ return false ;
153+ }
154+
155+ return queryEquals ( left as Query , right as Query ) ;
156+ }
157+
158+ function newQueriesObjectMap ( ) : ObjectMap < QueryOrPipeline , QueryListenersInfo > {
159+ return new ObjectMap < QueryOrPipeline , QueryListenersInfo > (
160+ q => canonifyQueryOrPipeline ( q ) ,
161+ queryOrPipelineEqual
118162 ) ;
119163}
120164
@@ -129,7 +173,6 @@ function validateEventManager(eventManagerImpl: EventManagerImpl): void {
129173 ! ! eventManagerImpl . onLastRemoteStoreUnlisten ,
130174 'onLastRemoteStoreUnlisten not set'
131175 ) ;
132- debugAssert ( ! ! eventManagerImpl . onListenPipeline , 'onListenPipeline not set' ) ;
133176}
134177
135178const enum ListenerSetupAction {
@@ -194,7 +237,11 @@ export async function eventManagerListen(
194237 } catch ( e ) {
195238 const firestoreError = wrapInUserErrorIfRecoverable (
196239 e as Error ,
197- `Initialization of query '${ stringifyQuery ( listener . query ) } ' failed`
240+ `Initialization of query '${
241+ isPipeline ( listener . query )
242+ ? canonifyPipeline ( listener . query )
243+ : stringifyQuery ( listener . query )
244+ } ' failed`
198245 ) ;
199246 listener . onError ( firestoreError ) ;
200247 return ;
@@ -220,25 +267,6 @@ export async function eventManagerListen(
220267 }
221268}
222269
223- export async function eventManagerListenPipeline (
224- eventManager : EventManager ,
225- listener : PipelineListener
226- ) : Promise < void > {
227- const eventManagerImpl = debugCast ( eventManager , EventManagerImpl ) ;
228- validateEventManager ( eventManagerImpl ) ;
229-
230- try {
231- await eventManagerImpl . onListenPipeline ! ( listener ) ;
232- } catch ( e ) {
233- const firestoreError = wrapInUserErrorIfRecoverable (
234- e as Error ,
235- `Initialization of query '${ listener . pipeline } ' failed`
236- ) ;
237- listener . onError ( firestoreError ) ;
238- return ;
239- }
240- }
241-
242270export async function eventManagerUnlisten (
243271 eventManager : EventManager ,
244272 listener : QueryListener
@@ -312,13 +340,6 @@ export function eventManagerOnWatchChange(
312340 }
313341}
314342
315- export function eventManagerOnPipelineWatchChange (
316- eventManager : EventManager ,
317- viewSnaps : PipelineResultView [ ]
318- ) : void {
319- const eventManagerImpl = debugCast ( eventManager , EventManagerImpl ) ;
320- }
321-
322343export function eventManagerOnWatchError (
323344 eventManager : EventManager ,
324345 query : Query ,
@@ -445,7 +466,7 @@ export class QueryListener {
445466 private onlineState = OnlineState . Unknown ;
446467
447468 constructor (
448- readonly query : Query ,
469+ readonly query : QueryOrPipeline ,
449470 private queryObserver : Observer < ViewSnapshot > ,
450471 options ?: ListenOptions
451472 ) {
0 commit comments