Skip to content

Commit 2a2b592

Browse files
committed
quick hack to integrate with watch.
1 parent 405c869 commit 2a2b592

File tree

10 files changed

+333
-20
lines changed

10 files changed

+333
-20
lines changed

packages/firestore/src/api/pipeline.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
1-
import { firestoreClientExecutePipeline } from '../core/firestore_client';
1+
import {
2+
firestoreClientExecutePipeline,
3+
firestoreClientListenPipeline
4+
} from '../core/firestore_client';
25
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
36
import { PipelineResult } from '../lite-api/pipeline-result';
47
import { DocumentData, DocumentReference } from '../lite-api/reference';
5-
import { Stage } from '../lite-api/stage';
8+
import { AddFields, Stage } from '../lite-api/stage';
69
import { UserDataReader } from '../lite-api/user_data_reader';
710
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
811
import { DocumentKey } from '../model/document_key';
912

1013
import { ensureFirestoreConfigured, Firestore } from './database';
14+
import { DocumentSnapshot, PipelineSnapshot } from './snapshot';
15+
import { FirestoreError } from '../util/error';
16+
import { Unsubscribe } from './reference_impl';
17+
import { cast } from '../util/input_validation';
1118

1219
export class Pipeline<
1320
AppModelType = DocumentData
@@ -94,4 +101,29 @@ export class Pipeline<
94101
return docs;
95102
});
96103
}
104+
105+
/**
106+
* @internal
107+
* @private
108+
*/
109+
_onSnapshot(observer: {
110+
next?: (snapshot: PipelineSnapshot) => void;
111+
error?: (error: FirestoreError) => void;
112+
complete?: () => void;
113+
}): Unsubscribe {
114+
this.stages.push(
115+
new AddFields(
116+
this.selectablesToMap([
117+
'__name__',
118+
'__create_time__',
119+
'__update_time__'
120+
])
121+
)
122+
);
123+
124+
const client = ensureFirestoreConfigured(this.db);
125+
firestoreClientListenPipeline(client, this, observer);
126+
127+
return () => {};
128+
}
97129
}

packages/firestore/src/api/snapshot.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import { Code, FirestoreError } from '../util/error';
4040

4141
import { Firestore } from './database';
4242
import { SnapshotListenOptions } from './reference_impl';
43+
import { Pipeline } from './pipeline';
44+
import { PipelineResult } from '../lite-api/pipeline-result';
4345

4446
/**
4547
* Converter used by `withConverter()` to transform user objects of type
@@ -790,3 +792,37 @@ export function snapshotEqual<AppModelType, DbModelType extends DocumentData>(
790792

791793
return false;
792794
}
795+
796+
export class PipelineSnapshot<AppModelType = DocumentData> {
797+
/**
798+
* Metadata about this snapshot, concerning its source and if it has local
799+
* modifications.
800+
*/
801+
readonly metadata: SnapshotMetadata;
802+
803+
/**
804+
* The query on which you called `get` or `onSnapshot` in order to get this
805+
* `QuerySnapshot`.
806+
*/
807+
readonly pipeline: Pipeline<AppModelType>;
808+
809+
/** @hideconstructor */
810+
constructor(
811+
readonly _firestore: Firestore,
812+
readonly _userDataWriter: AbstractUserDataWriter,
813+
pipeline: Pipeline<AppModelType>,
814+
readonly _snapshot: ViewSnapshot
815+
) {
816+
this.metadata = new SnapshotMetadata(
817+
_snapshot.hasPendingWrites,
818+
_snapshot.fromCache
819+
);
820+
this.pipeline = pipeline;
821+
}
822+
823+
/** An array of all the documents in the `QuerySnapshot`. */
824+
get results(): Array<PipelineResult<AppModelType>> {
825+
const result: Array<PipelineResult<AppModelType>> = [];
826+
return result;
827+
}
828+
}

packages/firestore/src/core/event_manager.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import { ObjectMap } from '../util/obj_map';
2424
import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query';
2525
import { OnlineState } from './types';
2626
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
27+
import { Pipeline } from '../api/pipeline';
28+
import { PipelineSnapshot } from '../api/snapshot';
29+
import { PipelineResultView } from './sync_engine_impl';
2730

2831
/**
2932
* Holds the listeners and the last received ViewSnapshot for a query being
@@ -64,6 +67,8 @@ export interface EventManager {
6467
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
6568
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
6669
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
70+
// TODO(pipeline): consolidate query and pipeline
71+
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;
6772
terminate(): void;
6873
}
6974

@@ -85,6 +90,7 @@ export class EventManagerImpl implements EventManager {
8590
) => Promise<ViewSnapshot>;
8691
/** Callback invoked once all listeners to a Query are removed. */
8792
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
93+
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;
8894

8995
/**
9096
* Callback invoked when a Query starts listening to the remote store, while
@@ -123,6 +129,7 @@ function validateEventManager(eventManagerImpl: EventManagerImpl): void {
123129
!!eventManagerImpl.onLastRemoteStoreUnlisten,
124130
'onLastRemoteStoreUnlisten not set'
125131
);
132+
debugAssert(!!eventManagerImpl.onListenPipeline, 'onListenPipeline not set');
126133
}
127134

128135
const enum ListenerSetupAction {
@@ -213,6 +220,25 @@ export async function eventManagerListen(
213220
}
214221
}
215222

223+
export async function eventManagerListenPipeline(
224+
eventManager: EventManager,
225+
listener: PipelineListener
226+
): Promise<void> {
227+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
228+
validateEventManager(eventManagerImpl);
229+
230+
try {
231+
await eventManagerImpl.onListenPipeline!(listener);
232+
} catch (e) {
233+
const firestoreError = wrapInUserErrorIfRecoverable(
234+
e as Error,
235+
`Initialization of query '${listener.pipeline}' failed`
236+
);
237+
listener.onError(firestoreError);
238+
return;
239+
}
240+
}
241+
216242
export async function eventManagerUnlisten(
217243
eventManager: EventManager,
218244
listener: QueryListener
@@ -286,6 +312,13 @@ export function eventManagerOnWatchChange(
286312
}
287313
}
288314

315+
export function eventManagerOnPipelineWatchChange(
316+
eventManager: EventManager,
317+
viewSnaps: PipelineResultView[]
318+
): void {
319+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
320+
}
321+
289322
export function eventManagerOnWatchError(
290323
eventManager: EventManager,
291324
query: Query,
@@ -567,3 +600,21 @@ export class QueryListener {
567600
return this.options.source !== ListenerDataSource.Cache;
568601
}
569602
}
603+
604+
export class PipelineListener {
605+
private snap: PipelineResultView | null = null;
606+
607+
constructor(
608+
readonly pipeline: Pipeline,
609+
private queryObserver: Observer<PipelineSnapshot>
610+
) {}
611+
612+
onViewSnapshot(snap: PipelineResultView): boolean {
613+
this.snap = snap;
614+
return true;
615+
}
616+
617+
onError(error: FirestoreError): void {
618+
this.queryObserver.error(error);
619+
}
620+
}

packages/firestore/src/core/firestore_client.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import {
2323
CredentialsProvider
2424
} from '../api/credentials';
2525
import { User } from '../auth/user';
26-
import { Pipeline } from '../lite-api/pipeline';
26+
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
27+
import { Pipeline } from '../api/pipeline';
2728
import { LocalStore } from '../local/local_store';
2829
import {
2930
localStoreConfigureFieldIndexes,
@@ -79,16 +80,19 @@ import {
7980
addSnapshotsInSyncListener,
8081
EventManager,
8182
eventManagerListen,
83+
eventManagerListenPipeline,
8284
eventManagerUnlisten,
8385
ListenOptions,
8486
Observer,
87+
PipelineListener,
8588
QueryListener,
8689
removeSnapshotsInSyncListener
8790
} from './event_manager';
8891
import { newQueryForPath, Query } from './query';
8992
import { SyncEngine } from './sync_engine';
9093
import {
9194
syncEngineListen,
95+
syncEngineListenPipeline,
9296
syncEngineLoadBundle,
9397
syncEngineRegisterPendingWritesCallback,
9498
syncEngineUnlisten,
@@ -101,6 +105,8 @@ import { TransactionOptions } from './transaction_options';
101105
import { TransactionRunner } from './transaction_runner';
102106
import { View } from './view';
103107
import { ViewSnapshot } from './view_snapshot';
108+
import { Unsubscribe } from '../api/reference_impl';
109+
import { PipelineSnapshot } from '../api/snapshot';
104110

105111
const LOG_TAG = 'FirestoreClient';
106112
export const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
@@ -404,6 +410,10 @@ export async function getEventManager(
404410
null,
405411
onlineComponentProvider.syncEngine
406412
);
413+
eventManager.onListenPipeline = syncEngineListenPipeline.bind(
414+
null,
415+
onlineComponentProvider.syncEngine
416+
);
407417
return eventManager;
408418
}
409419

@@ -556,7 +566,7 @@ export function firestoreClientRunAggregateQuery(
556566

557567
export function firestoreClientExecutePipeline(
558568
client: FirestoreClient,
559-
pipeline: Pipeline
569+
pipeline: LitePipeline
560570
): Promise<PipelineStreamElement[]> {
561571
const deferred = new Deferred<PipelineStreamElement[]>();
562572

@@ -571,6 +581,27 @@ export function firestoreClientExecutePipeline(
571581
return deferred.promise;
572582
}
573583

584+
export function firestoreClientListenPipeline(
585+
client: FirestoreClient,
586+
pipeline: Pipeline,
587+
observer: {
588+
next?: (snapshot: PipelineSnapshot) => void;
589+
error?: (error: FirestoreError) => void;
590+
complete?: () => void;
591+
}
592+
): Unsubscribe {
593+
const wrappedObserver = new AsyncObserver(observer);
594+
const listener = new PipelineListener(pipeline, wrappedObserver);
595+
client.asyncQueue.enqueueAndForget(async () => {
596+
const eventManager = await getEventManager(client);
597+
return eventManagerListenPipeline(eventManager, listener);
598+
});
599+
return () => {
600+
wrappedObserver.mute();
601+
// TODO(pipeline): actually unlisten
602+
};
603+
}
604+
574605
export function firestoreClientWrite(
575606
client: FirestoreClient,
576607
mutations: Mutation[]

packages/firestore/src/core/query.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {
3535
Target,
3636
targetEquals
3737
} from './target';
38+
import { Pipeline } from '../api/pipeline';
3839

3940
export const enum LimitType {
4041
First = 'F',

0 commit comments

Comments
 (0)