Skip to content

Commit d7a3962

Browse files
committed
Add options to onRealtimePipelineSnapshots
1 parent b266798 commit d7a3962

File tree

11 files changed

+281
-66
lines changed

11 files changed

+281
-66
lines changed

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 92 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,19 @@
1515
* limitations under the License.
1616
*/
1717

18-
import { Pipeline } from '../api/pipeline';
18+
// Re-adding necessary imports that were removed previously
19+
import {
20+
CompleteFn,
21+
ErrorFn,
22+
isPartialObserver,
23+
NextFn,
24+
PartialObserver
25+
} from '../api/observer';
1926
import {
2027
firestoreClientExecutePipeline,
2128
firestoreClientListen
2229
} from '../core/firestore_client';
30+
import { ListenerDataSource } from '../core/event_manager';
2331
import { toCorePipeline } from '../core/pipeline-util';
2432
import { ViewSnapshot } from '../core/view_snapshot';
2533
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
@@ -31,9 +39,10 @@ import { FirestoreError } from '../util/error';
3139
import { cast } from '../util/input_validation';
3240

3341
import { ensureFirestoreConfigured, Firestore } from './database';
42+
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
3443
import { RealtimePipeline } from './realtime_pipeline';
3544
import { DocumentReference } from './reference';
36-
import { Unsubscribe } from './reference_impl';
45+
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
3746
import { RealtimePipelineSnapshot } from './snapshot';
3847
import { ExpUserDataWriter } from './user_data_writer';
3948

@@ -143,18 +152,91 @@ Firestore.prototype.realtimePipeline =
143152
*/
144153
export function _onRealtimePipelineSnapshot(
145154
pipeline: RealtimePipeline,
146-
next: (snapshot: RealtimePipelineSnapshot) => void,
147-
error?: (error: FirestoreError) => void,
148-
complete?: () => void
155+
observer: {
156+
next?: (snapshot: RealtimePipelineSnapshot) => void;
157+
error?: (error: FirestoreError) => void;
158+
complete?: () => void;
159+
}
160+
): Unsubscribe;
161+
/**
162+
* @internal
163+
* @private
164+
*/
165+
export function _onRealtimePipelineSnapshot(
166+
pipeline: RealtimePipeline,
167+
options: SnapshotListenOptions,
168+
observer: {
169+
next?: (snapshot: RealtimePipelineSnapshot) => void;
170+
error?: (error: FirestoreError) => void;
171+
complete?: () => void;
172+
}
173+
): Unsubscribe;
174+
/**
175+
* @internal
176+
* @private
177+
*/
178+
export function _onRealtimePipelineSnapshot(
179+
pipeline: RealtimePipeline,
180+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
181+
onError?: (error: FirestoreError) => void,
182+
onComplete?: () => void
183+
): Unsubscribe;
184+
/**
185+
* @internal
186+
* @private
187+
*/
188+
export function _onRealtimePipelineSnapshot(
189+
pipeline: RealtimePipeline,
190+
options: SnapshotListenOptions,
191+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
192+
onError?: (error: FirestoreError) => void,
193+
onComplete?: () => void
194+
): Unsubscribe;
195+
export function _onRealtimePipelineSnapshot(
196+
pipeline: RealtimePipeline,
197+
...args: unknown[]
149198
): Unsubscribe {
199+
let options: SnapshotListenOptions = {
200+
includeMetadataChanges: false,
201+
source: 'default'
202+
};
203+
let currArg = 0;
204+
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
205+
options = args[currArg] as SnapshotListenOptions;
206+
currArg++;
207+
}
208+
209+
const internalOptions = {
210+
includeMetadataChanges: options.includeMetadataChanges,
211+
source: options.source as ListenerDataSource
212+
};
213+
214+
let userObserver: PartialObserver<RealtimePipelineSnapshot>;
215+
if (isPartialObserver(args[currArg])) {
216+
userObserver = args[currArg] as PartialObserver<RealtimePipelineSnapshot>;
217+
} else {
218+
userObserver = {
219+
next: args[currArg] as NextFn<RealtimePipelineSnapshot>,
220+
error: args[currArg + 1] as ErrorFn,
221+
complete: args[currArg + 2] as CompleteFn
222+
};
223+
}
224+
150225
const client = ensureFirestoreConfigured(pipeline._db as Firestore);
151226
const observer = {
152227
next: (snapshot: ViewSnapshot) => {
153-
next(new RealtimePipelineSnapshot(pipeline, snapshot));
228+
if (userObserver.next) {
229+
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
230+
}
154231
},
155-
error,
156-
complete
232+
error: userObserver.error,
233+
complete: userObserver.complete
157234
};
158-
// TODO(pipeline) hook up options
159-
return firestoreClientListen(client, toCorePipeline(pipeline), {}, observer);
235+
236+
return firestoreClientListen(
237+
client,
238+
toCorePipeline(pipeline),
239+
internalOptions, // Pass parsed options here
240+
observer
241+
);
160242
}

packages/firestore/src/core/pipeline-util.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ function reverseOrderings(orderings: Ordering[]): Ordering[] {
211211
);
212212
}
213213

214-
export function toPipeline(query: Query, db: Firestore): Pipeline {
214+
export function toPipelineStages(query: Query, db: Firestore): Stage[] {
215215
let pipeline: Pipeline;
216216
if (isCollectionGroupQuery(query)) {
217217
pipeline = db.pipeline().collectionGroup(query.collectionGroup!);
@@ -287,7 +287,7 @@ export function toPipeline(query: Query, db: Firestore): Pipeline {
287287
}
288288
}
289289

290-
return pipeline;
290+
return pipeline.stages;
291291
}
292292

293293
function whereConditionsFromCursor(

packages/firestore/src/lite-api/pipeline-source.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717

1818
import { DatabaseId } from '../core/database_info';
19-
import { toPipeline } from '../core/pipeline-util';
19+
import { toPipelineStages } from '../core/pipeline-util';
2020
import { FirestoreError, Code } from '../util/error';
2121

2222
import { Pipeline } from './pipeline';
@@ -114,8 +114,10 @@ export class PipelineSource<PipelineType> {
114114
*
115115
* @throws {@FirestoreError} Thrown if any of the provided DocumentReferences target a different project or database than the pipeline.
116116
*/
117-
createFrom(query: Query): Pipeline {
118-
return toPipeline(query._query, query.firestore);
117+
createFrom(query: Query): PipelineType {
118+
return this._createPipeline(
119+
toPipelineStages(query._query, query.firestore)
120+
);
119121
}
120122

121123
_validateReference(reference: CollectionReference | DocumentReference): void {

packages/firestore/src/lite-api/pipeline_impl.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
import { RealtimePipeline } from '../api/realtime_pipeline';
1819
import { invokeExecutePipeline } from '../remote/datastore';
1920

2021
import { getDatastore } from './components';
@@ -30,6 +31,7 @@ import { newUserDataReader } from './user_data_reader';
3031
declare module './database' {
3132
interface Firestore {
3233
pipeline(): PipelineSource<Pipeline>;
34+
realtimePipeline(): PipelineSource<RealtimePipeline>;
3335
}
3436
}
3537

0 commit comments

Comments
 (0)