Skip to content

Commit e64cf0b

Browse files
committed
[3/4] Integrate realtime ppl into SDK cache management and tests
1 parent 80075c8 commit e64cf0b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+4199
-1728
lines changed

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 133 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,21 @@
1515
* limitations under the License.
1616
*/
1717

18-
import { Pipeline } from '../api/pipeline';
19-
import { firestoreClientExecutePipeline } from '../core/firestore_client';
18+
// Re-adding necessary imports that were removed previously
19+
import {
20+
CompleteFn,
21+
ErrorFn,
22+
isPartialObserver,
23+
NextFn,
24+
PartialObserver
25+
} from '../api/observer';
26+
import {
27+
firestoreClientExecutePipeline,
28+
firestoreClientListen
29+
} from '../core/firestore_client';
30+
import { ListenerDataSource } from '../core/event_manager';
31+
import { toCorePipeline } from '../core/pipeline-util';
32+
import { ViewSnapshot } from '../core/view_snapshot';
2033
import {
2134
StructuredPipeline,
2235
StructuredPipelineOptions
@@ -31,10 +44,15 @@ import {
3144
UserDataReader,
3245
UserDataSource
3346
} from '../lite-api/user_data_reader';
47+
import { FirestoreError } from '../util/error';
3448
import { cast } from '../util/input_validation';
3549

3650
import { ensureFirestoreConfigured, Firestore } from './database';
51+
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
52+
import { RealtimePipeline } from './realtime_pipeline';
3753
import { DocumentReference } from './reference';
54+
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
55+
import { RealtimePipelineSnapshot } from './snapshot';
3856
import { ExpUserDataWriter } from './user_data_writer';
3957

4058
declare module './database' {
@@ -49,6 +67,7 @@ declare module './database' {
4967
*/
5068
interface Firestore {
5169
pipeline(): PipelineSource<Pipeline>;
70+
realtimePipeline(): PipelineSource<RealtimePipeline>;
5271
}
5372
}
5473

@@ -179,3 +198,115 @@ Firestore.prototype.pipeline = function (): PipelineSource<Pipeline> {
179198
}
180199
);
181200
};
201+
202+
Firestore.prototype.realtimePipeline =
203+
function (): PipelineSource<RealtimePipeline> {
204+
const userDataReader = newUserDataReader(this);
205+
return new PipelineSource<RealtimePipeline>(
206+
this._databaseId,
207+
userDataReader,
208+
(stages: Stage[]) => {
209+
return new RealtimePipeline(
210+
this,
211+
newUserDataReader(this),
212+
new ExpUserDataWriter(this),
213+
stages
214+
);
215+
}
216+
);
217+
};
218+
219+
/**
220+
* @internal
221+
* @private
222+
*/
223+
export function _onRealtimePipelineSnapshot(
224+
pipeline: RealtimePipeline,
225+
observer: {
226+
next?: (snapshot: RealtimePipelineSnapshot) => void;
227+
error?: (error: FirestoreError) => void;
228+
complete?: () => void;
229+
}
230+
): Unsubscribe;
231+
/**
232+
* @internal
233+
* @private
234+
*/
235+
export function _onRealtimePipelineSnapshot(
236+
pipeline: RealtimePipeline,
237+
options: SnapshotListenOptions,
238+
observer: {
239+
next?: (snapshot: RealtimePipelineSnapshot) => void;
240+
error?: (error: FirestoreError) => void;
241+
complete?: () => void;
242+
}
243+
): Unsubscribe;
244+
/**
245+
* @internal
246+
* @private
247+
*/
248+
export function _onRealtimePipelineSnapshot(
249+
pipeline: RealtimePipeline,
250+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
251+
onError?: (error: FirestoreError) => void,
252+
onComplete?: () => void
253+
): Unsubscribe;
254+
/**
255+
* @internal
256+
* @private
257+
*/
258+
export function _onRealtimePipelineSnapshot(
259+
pipeline: RealtimePipeline,
260+
options: SnapshotListenOptions,
261+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
262+
onError?: (error: FirestoreError) => void,
263+
onComplete?: () => void
264+
): Unsubscribe;
265+
export function _onRealtimePipelineSnapshot(
266+
pipeline: RealtimePipeline,
267+
...args: unknown[]
268+
): Unsubscribe {
269+
let options: SnapshotListenOptions = {
270+
includeMetadataChanges: false,
271+
source: 'default'
272+
};
273+
let currArg = 0;
274+
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
275+
options = args[currArg] as SnapshotListenOptions;
276+
currArg++;
277+
}
278+
279+
const internalOptions = {
280+
includeMetadataChanges: options.includeMetadataChanges,
281+
source: options.source as ListenerDataSource
282+
};
283+
284+
let userObserver: PartialObserver<RealtimePipelineSnapshot>;
285+
if (isPartialObserver(args[currArg])) {
286+
userObserver = args[currArg] as PartialObserver<RealtimePipelineSnapshot>;
287+
} else {
288+
userObserver = {
289+
next: args[currArg] as NextFn<RealtimePipelineSnapshot>,
290+
error: args[currArg + 1] as ErrorFn,
291+
complete: args[currArg + 2] as CompleteFn
292+
};
293+
}
294+
295+
const client = ensureFirestoreConfigured(pipeline._db as Firestore);
296+
const observer = {
297+
next: (snapshot: ViewSnapshot) => {
298+
if (userObserver.next) {
299+
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
300+
}
301+
},
302+
error: userObserver.error,
303+
complete: userObserver.complete
304+
};
305+
306+
return firestoreClientListen(
307+
client,
308+
toCorePipeline(pipeline),
309+
internalOptions, // Pass parsed options here
310+
observer
311+
);
312+
}

packages/firestore/src/api/reference_impl.ts

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ import {
3535
firestoreClientListen,
3636
firestoreClientWrite
3737
} from '../core/firestore_client';
38-
import { newQueryForPath, Query as InternalQuery } from '../core/query';
38+
import { QueryOrPipeline, toCorePipeline } from '../core/pipeline-util';
39+
import { Query as InternalQuery, newQueryForPath } from '../core/query';
3940
import { ViewSnapshot } from '../core/view_snapshot';
4041
import { FieldPath } from '../lite-api/field_path';
4142
import { validateHasExplicitOrderByForLimitToLast } from '../lite-api/query';
@@ -69,9 +70,12 @@ import {
6970
DocumentSnapshot,
7071
FirestoreDataConverter,
7172
QuerySnapshot,
73+
RealtimePipelineSnapshot,
7274
SnapshotMetadata
7375
} from './snapshot';
7476
import { ExpUserDataWriter } from './user_data_writer';
77+
import { RealtimePipeline } from './realtime_pipeline';
78+
import { CorePipeline } from '../core/pipeline';
7579

7680
/**
7781
* An options object that can be passed to {@link (onSnapshot:1)} and {@link
@@ -197,6 +201,10 @@ export function getDocFromServer<
197201
*
198202
* @returns A `Promise` that will be resolved with the results of the query.
199203
*/
204+
export function getDocs<AppModelType, DbModelType extends DocumentData>(
205+
query: Query<AppModelType, DbModelType>
206+
): Promise<QuerySnapshot<AppModelType, DbModelType>>;
207+
200208
export function getDocs<AppModelType, DbModelType extends DocumentData>(
201209
query: Query<AppModelType, DbModelType>
202210
): Promise<QuerySnapshot<AppModelType, DbModelType>> {
@@ -214,7 +222,7 @@ export function getDocs<AppModelType, DbModelType extends DocumentData>(
214222
new QuerySnapshot<AppModelType, DbModelType>(
215223
firestore,
216224
userDataWriter,
217-
query,
225+
query as Query<AppModelType, DbModelType>,
218226
snapshot
219227
)
220228
);
@@ -642,6 +650,7 @@ export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
642650
onError?: (error: FirestoreError) => void,
643651
onCompletion?: () => void
644652
): Unsubscribe;
653+
645654
export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
646655
reference:
647656
| Query<AppModelType, DbModelType>
@@ -1061,6 +1070,94 @@ export function onSnapshotResume<
10611070
}
10621071
}
10631072

1073+
export function onPipelineSnapshot(
1074+
query: RealtimePipeline,
1075+
observer: {
1076+
next?: (snapshot: RealtimePipelineSnapshot) => void;
1077+
error?: (error: FirestoreError) => void;
1078+
complete?: () => void;
1079+
}
1080+
): Unsubscribe;
1081+
export function onPipelineSnapshot(
1082+
query: RealtimePipeline,
1083+
options: SnapshotListenOptions,
1084+
observer: {
1085+
next?: (snapshot: RealtimePipelineSnapshot) => void;
1086+
error?: (error: FirestoreError) => void;
1087+
complete?: () => void;
1088+
}
1089+
): Unsubscribe;
1090+
export function onPipelineSnapshot(
1091+
query: RealtimePipeline,
1092+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
1093+
onError?: (error: FirestoreError) => void,
1094+
onCompletion?: () => void
1095+
): Unsubscribe;
1096+
export function onPipelineSnapshot(
1097+
query: RealtimePipeline,
1098+
options: SnapshotListenOptions,
1099+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
1100+
onError?: (error: FirestoreError) => void,
1101+
onCompletion?: () => void
1102+
): Unsubscribe;
1103+
export function onPipelineSnapshot(
1104+
reference: RealtimePipeline,
1105+
...args: unknown[]
1106+
): Unsubscribe {
1107+
reference = getModularInstance(reference);
1108+
1109+
let options: SnapshotListenOptions = {
1110+
includeMetadataChanges: false,
1111+
source: 'default'
1112+
};
1113+
let currArg = 0;
1114+
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
1115+
options = args[currArg] as SnapshotListenOptions;
1116+
currArg++;
1117+
}
1118+
1119+
const internalOptions = {
1120+
includeMetadataChanges: options.includeMetadataChanges,
1121+
source: options.source as ListenerDataSource
1122+
};
1123+
1124+
if (isPartialObserver(args[currArg])) {
1125+
const userObserver = args[
1126+
currArg
1127+
] as PartialObserver<RealtimePipelineSnapshot>;
1128+
args[currArg] = userObserver.next?.bind(userObserver);
1129+
args[currArg + 1] = userObserver.error?.bind(userObserver);
1130+
args[currArg + 2] = userObserver.complete?.bind(userObserver);
1131+
}
1132+
1133+
let observer: PartialObserver<ViewSnapshot>;
1134+
let firestore: Firestore;
1135+
let internalQuery: CorePipeline;
1136+
1137+
// RealtimePipeline
1138+
firestore = cast(reference._db, Firestore);
1139+
internalQuery = toCorePipeline(reference);
1140+
observer = {
1141+
next: snapshot => {
1142+
if (args[currArg]) {
1143+
(args[currArg] as NextFn<RealtimePipelineSnapshot>)(
1144+
new RealtimePipelineSnapshot(reference as RealtimePipeline, snapshot)
1145+
);
1146+
}
1147+
},
1148+
error: args[currArg + 1] as ErrorFn,
1149+
complete: args[currArg + 2] as CompleteFn
1150+
};
1151+
1152+
const client = ensureFirestoreConfigured(firestore);
1153+
return firestoreClientListen(
1154+
client,
1155+
internalQuery,
1156+
internalOptions,
1157+
observer
1158+
);
1159+
}
1160+
10641161
// TODO(firestorexp): Make sure these overloads are tested via the Firestore
10651162
// integration tests
10661163

0 commit comments

Comments
 (0)