@@ -121,6 +121,7 @@ import { ViewSnapshot } from './view_snapshot';
121121import { Pipeline } from '../api/pipeline' ;
122122import { PipelineSnapshot } from '../api/snapshot' ;
123123import { PipelineResult } from '../lite-api/pipeline-result' ;
124+ import { doc } from '../lite-api/reference' ;
124125
125126const LOG_TAG = 'SyncEngine' ;
126127
@@ -150,9 +151,12 @@ class QueryView {
150151}
151152
152153export class PipelineResultView {
153- private keyToIndexMap : Map < DocumentKey , number > ;
154+ private keyToIndexMap : ObjectMap < DocumentKey , number > ;
154155 constructor ( public pipeline : Pipeline , public view : Array < MutableDocument > ) {
155- this . keyToIndexMap = new Map < DocumentKey , number > ( ) ;
156+ this . keyToIndexMap = new ObjectMap < DocumentKey , number > (
157+ key => key . toString ( ) ,
158+ ( a , b ) => a . isEqual ( b )
159+ ) ;
156160 this . buildKeyToIndexMap ( ) ;
157161 }
158162
@@ -197,6 +201,23 @@ export class PipelineResultView {
197201 }
198202 this . view [ index ] = doc ;
199203 }
204+
205+ toPipelineSnapshot ( ) : PipelineSnapshot {
206+ return new PipelineSnapshot (
207+ this . pipeline ,
208+ this . view . map (
209+ d =>
210+ new PipelineResult (
211+ this . pipeline . userDataWriter ,
212+ doc ( this . pipeline . db , d . key . toString ( ) ) ,
213+ d . data ,
214+ d . readTime . toTimestamp ( ) ,
215+ d . createTime . toTimestamp ( ) ,
216+ d . version . toTimestamp ( )
217+ )
218+ )
219+ ) ;
220+ }
200221}
201222
202223/** Tracks a limbo resolution. */
@@ -1000,7 +1021,10 @@ function removeAndCleanupTarget(
10001021 syncEngineImpl . sharedClientState . removeLocalQueryTarget ( targetId ) ;
10011022
10021023 // TODO(pipeline): REMOVE this hack.
1003- if ( ! syncEngineImpl . queriesByTarget . has ( targetId ) || syncEngineImpl . queriesByTarget . get ( targetId ) ! . length !== 0 ) {
1024+ if (
1025+ ! syncEngineImpl . queriesByTarget . has ( targetId ) ||
1026+ syncEngineImpl . queriesByTarget . get ( targetId ) ! . length !== 0
1027+ ) {
10041028 return ;
10051029 }
10061030
@@ -1174,7 +1198,10 @@ export async function syncEngineEmitNewSnapsAndNotifyLocalStore(
11741198 const change = remoteEvent ?. targetChanges . get ( targetId ) ;
11751199 if ( ! ! change ) {
11761200 change . modifiedDocuments . forEach ( key => {
1177- results . updateResult ( key , remoteEvent ?. augmentedDocumentUpdates . get ( key ) ! ) ;
1201+ results . updateResult (
1202+ key ,
1203+ remoteEvent ?. augmentedDocumentUpdates . get ( key ) !
1204+ ) ;
11781205 } ) ;
11791206 change . addedDocuments . forEach ( key => {
11801207 results . addResult ( key , remoteEvent ?. augmentedDocumentUpdates . get ( key ) ! ) ;
@@ -1319,17 +1346,21 @@ export function syncEngineGetRemoteKeysForTarget(
13191346 } else {
13201347 let keySet = documentKeySet ( ) ;
13211348 const queries = syncEngineImpl . queriesByTarget . get ( targetId ) ;
1322- if ( ! queries ) {
1349+ const pipelineView = syncEngineImpl . pipelineViewByTarget . get ( targetId ) ;
1350+ if ( ! queries && ! pipelineView ) {
13231351 return keySet ;
13241352 }
1325- for ( const query of queries ) {
1353+ for ( const query of queries ?? [ ] ) {
13261354 const queryView = syncEngineImpl . queryViewsByQuery . get ( query ) ;
13271355 debugAssert (
13281356 ! ! queryView ,
13291357 `No query view found for ${ stringifyQuery ( query ) } `
13301358 ) ;
13311359 keySet = keySet . unionWith ( queryView . view . syncedDocuments ) ;
13321360 }
1361+ for ( const doc of pipelineView ?. view ?? [ ] ) {
1362+ keySet = keySet . add ( doc . key ) ;
1363+ }
13331364 return keySet ;
13341365 }
13351366}
0 commit comments