Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/firestore/lite/pipelines/pipelines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ export {
neq,
lt,
countIf,
currentContext,
lte,
gt,
gte,
Expand Down
135 changes: 133 additions & 2 deletions packages/firestore/src/api/pipeline_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,41 @@
* limitations under the License.
*/

import { Pipeline } from '../api/pipeline';
import { firestoreClientExecutePipeline } from '../core/firestore_client';
// Re-adding necessary imports that were removed previously
import {
CompleteFn,
ErrorFn,
isPartialObserver,
NextFn,
PartialObserver
} from '../api/observer';
import {
firestoreClientExecutePipeline,
firestoreClientListen
} from '../core/firestore_client';
import { ListenerDataSource } from '../core/event_manager';

Check failure on line 30 in packages/firestore/src/api/pipeline_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`../core/event_manager` import should occur before import of `../core/firestore_client`
import { toCorePipeline } from '../core/pipeline-util';
import { ViewSnapshot } from '../core/view_snapshot';
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { PipelineResult, PipelineSnapshot } from '../lite-api/pipeline-result';
import { PipelineSource } from '../lite-api/pipeline-source';
import { Stage } from '../lite-api/stage';
import { newUserDataReader } from '../lite-api/user_data_reader';
import { FirestoreError } from '../util/error';
import { cast } from '../util/input_validation';

import { ensureFirestoreConfigured, Firestore } from './database';
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
import { RealtimePipeline } from './realtime_pipeline';
import { DocumentReference } from './reference';
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
import { RealtimePipelineSnapshot } from './snapshot';
import { ExpUserDataWriter } from './user_data_writer';

declare module './database' {
interface Firestore {
pipeline(): PipelineSource<Pipeline>;
realtimePipeline(): PipelineSource<RealtimePipeline>;
}
}

Expand Down Expand Up @@ -71,6 +90,7 @@
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot> {
const firestore = cast(pipeline._db, Firestore);
const client = ensureFirestoreConfigured(firestore);

return firestoreClientExecutePipeline(client, pipeline).then(result => {
// Get the execution time from the first result.
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
Expand All @@ -90,6 +110,7 @@
? new DocumentReference(firestore, null, element.key)
: undefined,
element.fields,
element.executionTime?.toTimestamp(),
element.createTime?.toTimestamp(),
element.updateTime?.toTimestamp()
)
Expand All @@ -110,3 +131,113 @@
);
});
};

Firestore.prototype.realtimePipeline =
function (): PipelineSource<RealtimePipeline> {
return new PipelineSource<RealtimePipeline>(
this._databaseId,
(stages: Stage[]) => {
return new RealtimePipeline(
this,
newUserDataReader(this),
new ExpUserDataWriter(this),
stages
);
}
);
};

/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
options: SnapshotListenOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our design doc was updated to use PipelineListenOptions, which will add serverTimestamps config and remove source. Though, if we want to support source, we can. go/firestore-api-realtime-pipelines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be refactored in a future PR, as long as we track it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, we will need to put together a design for how pipeline options are passed to onSnapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot about serverTimestamp here. Let's proceed without this for now. I added one item to the tracking sheet.

observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onComplete?: () => void
): Unsubscribe;
/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
options: SnapshotListenOptions,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onComplete?: () => void
): Unsubscribe;
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
...args: unknown[]
): Unsubscribe {
let options: SnapshotListenOptions = {
includeMetadataChanges: false,
source: 'default'
};
let currArg = 0;
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
options = args[currArg] as SnapshotListenOptions;
currArg++;
}

const internalOptions = {
includeMetadataChanges: options.includeMetadataChanges,
source: options.source as ListenerDataSource
};

let userObserver: PartialObserver<RealtimePipelineSnapshot>;
if (isPartialObserver(args[currArg])) {
userObserver = args[currArg] as PartialObserver<RealtimePipelineSnapshot>;
} else {
userObserver = {
next: args[currArg] as NextFn<RealtimePipelineSnapshot>,
error: args[currArg + 1] as ErrorFn,
complete: args[currArg + 2] as CompleteFn
};
}

const client = ensureFirestoreConfigured(pipeline._db as Firestore);
const observer = {
next: (snapshot: ViewSnapshot) => {
if (userObserver.next) {
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
}
},
error: userObserver.error,
complete: userObserver.complete
};

return firestoreClientListen(
client,
toCorePipeline(pipeline),
internalOptions, // Pass parsed options here
observer
);
}
108 changes: 104 additions & 4 deletions packages/firestore/src/api/reference_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
firestoreClientListen,
firestoreClientWrite
} from '../core/firestore_client';
import { newQueryForPath, Query as InternalQuery } from '../core/query';
import { QueryOrPipeline, toCorePipeline } from '../core/pipeline-util';

Check failure on line 37 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'QueryOrPipeline' is defined but never used

Check failure on line 37 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'QueryOrPipeline' is defined but never used. Allowed unused vars must match /^_/u
import { newQueryForPath } from '../core/query';
import { ViewSnapshot } from '../core/view_snapshot';
import { FieldPath } from '../lite-api/field_path';
import { validateHasExplicitOrderByForLimitToLast } from '../lite-api/query';
Expand Down Expand Up @@ -63,7 +64,13 @@
import { cast } from '../util/input_validation';

import { ensureFirestoreConfigured, Firestore } from './database';
import { DocumentSnapshot, QuerySnapshot, SnapshotMetadata } from './snapshot';
import { RealtimePipeline } from './realtime_pipeline';
import {
DocumentSnapshot,
QuerySnapshot,
RealtimePipelineSnapshot,
SnapshotMetadata
} from './snapshot';
import { ExpUserDataWriter } from './user_data_writer';

/**
Expand Down Expand Up @@ -190,6 +197,10 @@
*
* @returns A `Promise` that will be resolved with the results of the query.
*/
export function getDocs<AppModelType, DbModelType extends DocumentData>(
query: Query<AppModelType, DbModelType>
): Promise<QuerySnapshot<AppModelType, DbModelType>>;

export function getDocs<AppModelType, DbModelType extends DocumentData>(
query: Query<AppModelType, DbModelType>
): Promise<QuerySnapshot<AppModelType, DbModelType>> {
Expand All @@ -207,7 +218,7 @@
new QuerySnapshot<AppModelType, DbModelType>(
firestore,
userDataWriter,
query,
query as Query<AppModelType, DbModelType>,
snapshot
)
);
Expand Down Expand Up @@ -657,6 +668,7 @@
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;

export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
reference:
| Query<AppModelType, DbModelType>
Expand Down Expand Up @@ -691,7 +703,7 @@

let observer: PartialObserver<ViewSnapshot>;
let firestore: Firestore;
let internalQuery: InternalQuery;
let internalQuery: Query;

if (reference instanceof DocumentReference) {
firestore = cast(reference.firestore, Firestore);
Expand Down Expand Up @@ -744,6 +756,94 @@
);
}

export function onPipelineSnapshot(
query: RealtimePipeline,
observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onPipelineSnapshot(
query: RealtimePipeline,
options: SnapshotListenOptions,
observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onPipelineSnapshot(
query: RealtimePipeline,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onPipelineSnapshot(
query: RealtimePipeline,
options: SnapshotListenOptions,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onPipelineSnapshot(
reference: RealtimePipeline,
...args: unknown[]
): Unsubscribe {
reference = getModularInstance(reference);

let options: SnapshotListenOptions = {
includeMetadataChanges: false,
source: 'default'
};
let currArg = 0;
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
options = args[currArg] as SnapshotListenOptions;
currArg++;
}

const internalOptions = {
includeMetadataChanges: options.includeMetadataChanges,
source: options.source as ListenerDataSource
};

if (isPartialObserver(args[currArg])) {
const userObserver = args[currArg] as PartialObserver<
QuerySnapshot<AppModelType, DbModelType>
>;
args[currArg] = userObserver.next?.bind(userObserver);
args[currArg + 1] = userObserver.error?.bind(userObserver);
args[currArg + 2] = userObserver.complete?.bind(userObserver);
}

let observer: PartialObserver<ViewSnapshot>;
let firestore: Firestore;
let internalQuery: RealtimePipeline;

// RealtimePipeline
firestore = cast(reference._db, Firestore);

Check failure on line 824 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'firestore' is never reassigned. Use 'const' instead
internalQuery = toCorePipeline(reference);

Check failure on line 825 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'internalQuery' is never reassigned. Use 'const' instead
observer = {

Check failure on line 826 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'observer' is never reassigned. Use 'const' instead
next: snapshot => {
if (args[currArg]) {
(args[currArg] as NextFn<RealtimePipelineSnapshot>)(
new RealtimePipelineSnapshot(reference as RealtimePipeline, snapshot)
);
}
},
error: args[currArg + 1] as ErrorFn,
complete: args[currArg + 2] as CompleteFn
};

const client = ensureFirestoreConfigured(firestore);
return firestoreClientListen(
client,
internalQuery,
internalOptions,
observer
);
}

// TODO(firestorexp): Make sure these overloads are tested via the Firestore
// integration tests

Expand Down
Loading
Loading