Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

2 changes: 1 addition & 1 deletion packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import {
connectFirestoreEmulator,
Firestore as LiteFirestore
} from '../lite-api/database';
import { PipelineSource } from '../lite-api/pipeline-source';
import { PipelineSource } from './pipeline_source';
import { DocumentReference, Query } from '../lite-api/reference';
import { newUserDataReader } from '../lite-api/user_data_reader';
import {
Expand Down
56 changes: 53 additions & 3 deletions packages/firestore/src/api/pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import { firestoreClientExecutePipeline } from '../core/firestore_client';
import {
firestoreClientExecutePipeline,
firestoreClientListenPipeline
} from '../core/firestore_client';
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { PipelineResult } from '../lite-api/pipeline-result';
import { DocumentData, DocumentReference } from '../lite-api/reference';
import { Stage } from '../lite-api/stage';
import { AddFields, Sort, Stage, Where } from '../lite-api/stage';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
import { DocumentKey } from '../model/document_key';

import { ensureFirestoreConfigured, Firestore } from './database';
import { DocumentSnapshot, PipelineSnapshot } from './snapshot';
import { FirestoreError } from '../util/error';
import { Unsubscribe } from './reference_impl';
import { cast } from '../util/input_validation';
import { Field, FilterCondition } from '../api';
import { Expr } from '../lite-api/expressions';

export class Pipeline<
AppModelType = DocumentData
Expand All @@ -23,7 +32,7 @@ export class Pipeline<
* @param converter
*/
constructor(
private db: Firestore,
readonly db: Firestore,
userDataReader: UserDataReader,
userDataWriter: AbstractUserDataWriter,
documentReferenceFactory: (id: DocumentKey) => DocumentReference,
Expand All @@ -42,6 +51,20 @@ export class Pipeline<
);
}

where(condition: FilterCondition & Expr): Pipeline<AppModelType> {
const copy = this.stages.map(s => s);
super.readUserData('where', condition);
copy.push(new Where(condition));
return new Pipeline(
this.db,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
copy,
this.converter
);
}

/**
* Executes this pipeline and returns a Promise to represent the asynchronous operation.
*
Expand Down Expand Up @@ -94,4 +117,31 @@ export class Pipeline<
return docs;
});
}

/**
* @internal
* @private
*/
_onSnapshot(
next: (snapshot: PipelineSnapshot) => void,
error?: (error: FirestoreError) => void,
complete?: () => void
): Unsubscribe {
// this.stages.push(
// new AddFields(
// this.selectablesToMap([
// '__name__',
// '__create_time__',
// '__update_time__'
// ])
// )
// );

this.stages.push(new Sort([Field.of('__name__').ascending()]));

const client = ensureFirestoreConfigured(this.db);
firestoreClientListenPipeline(client, this, { next, error, complete });

return () => {};
}
}
91 changes: 91 additions & 0 deletions packages/firestore/src/api/pipeline_source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import { DocumentKey } from '../model/document_key';

import { Firestore } from './database';
import { Pipeline } from './pipeline';
import { DocumentReference } from './reference';
import {
CollectionGroupSource,
CollectionSource,
DatabaseSource,
DocumentsSource
} from '../lite-api/stage';
import { PipelineSource as LitePipelineSource } from '../lite-api/pipeline-source';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';

/**
* Represents the source of a Firestore {@link Pipeline}.
* @beta
*/
export class PipelineSource extends LitePipelineSource {
/**
* @internal
* @private
* @param db
* @param userDataReader
* @param userDataWriter
* @param documentReferenceFactory
*/
constructor(
db: Firestore,
userDataReader: UserDataReader,
userDataWriter: AbstractUserDataWriter,
documentReferenceFactory: (id: DocumentKey) => DocumentReference
) {
super(db, userDataReader, userDataWriter, documentReferenceFactory);
}

collection(collectionPath: string): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new CollectionSource(collectionPath)]
);
}

collectionGroup(collectionId: string): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new CollectionGroupSource(collectionId)]
);
}

database(): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new DatabaseSource()]
);
}

documents(docs: DocumentReference[]): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[DocumentsSource.of(docs)]
);
}
}
23 changes: 23 additions & 0 deletions packages/firestore/src/api/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import { Code, FirestoreError } from '../util/error';

import { Firestore } from './database';
import { SnapshotListenOptions } from './reference_impl';
import { Pipeline } from './pipeline';
import { PipelineResult } from '../lite-api/pipeline-result';

/**
* Converter used by `withConverter()` to transform user objects of type
Expand Down Expand Up @@ -790,3 +792,24 @@ export function snapshotEqual<AppModelType, DbModelType extends DocumentData>(

return false;
}

export class PipelineSnapshot<AppModelType = DocumentData> {
/**
* The query on which you called `get` or `onSnapshot` in order to get this
* `QuerySnapshot`.
*/
readonly pipeline: Pipeline<AppModelType>;

/** @hideconstructor */
constructor(
pipeline: Pipeline<AppModelType>,
readonly _snapshot: PipelineResult<AppModelType>[]
) {
this.pipeline = pipeline;
}

/** An array of all the documents in the `QuerySnapshot`. */
get results(): Array<PipelineResult<AppModelType>> {
return this._snapshot;
}
}
52 changes: 52 additions & 0 deletions packages/firestore/src/core/event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import { ObjectMap } from '../util/obj_map';
import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query';
import { OnlineState } from './types';
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
import { Pipeline } from '../api/pipeline';
import { PipelineSnapshot } from '../api/snapshot';
import { PipelineResultView } from './sync_engine_impl';

/**
* Holds the listeners and the last received ViewSnapshot for a query being
Expand Down Expand Up @@ -64,6 +67,8 @@ export interface EventManager {
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
// TODO(pipeline): consolidate query and pipeline
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;
terminate(): void;
}

Expand All @@ -85,6 +90,7 @@ export class EventManagerImpl implements EventManager {
) => Promise<ViewSnapshot>;
/** Callback invoked once all listeners to a Query are removed. */
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;

/**
* Callback invoked when a Query starts listening to the remote store, while
Expand Down Expand Up @@ -123,6 +129,7 @@ function validateEventManager(eventManagerImpl: EventManagerImpl): void {
!!eventManagerImpl.onLastRemoteStoreUnlisten,
'onLastRemoteStoreUnlisten not set'
);
debugAssert(!!eventManagerImpl.onListenPipeline, 'onListenPipeline not set');
}

const enum ListenerSetupAction {
Expand Down Expand Up @@ -213,6 +220,25 @@ export async function eventManagerListen(
}
}

export async function eventManagerListenPipeline(
eventManager: EventManager,
listener: PipelineListener
): Promise<void> {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
validateEventManager(eventManagerImpl);

try {
await eventManagerImpl.onListenPipeline!(listener);
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e as Error,
`Initialization of query '${listener.pipeline}' failed`
);
listener.onError(firestoreError);
return;
}
}

export async function eventManagerUnlisten(
eventManager: EventManager,
listener: QueryListener
Expand Down Expand Up @@ -286,6 +312,13 @@ export function eventManagerOnWatchChange(
}
}

export function eventManagerOnPipelineWatchChange(
eventManager: EventManager,
viewSnaps: PipelineResultView[]
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
}

export function eventManagerOnWatchError(
eventManager: EventManager,
query: Query,
Expand Down Expand Up @@ -567,3 +600,22 @@ export class QueryListener {
return this.options.source !== ListenerDataSource.Cache;
}
}

export class PipelineListener {
private view: PipelineResultView | null = null;

constructor(
readonly pipeline: Pipeline,
private queryObserver: Observer<PipelineSnapshot>
) {}

onViewSnapshot(view: PipelineResultView): boolean {
this.view = view;
this.queryObserver.next(view.toPipelineSnapshot());
return true;
}

onError(error: FirestoreError): void {
this.queryObserver.error(error);
}
}
Loading
Loading