Skip to content

Commit 80075c8

Browse files
committed
[2/4] Offline pipeline evaluation and tests
1 parent 7146778 commit 80075c8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+18262
-94
lines changed

common/api-review/firestore-pipelines.api.md

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,8 @@ export class Pipeline {
10011001
sort(ordering: Ordering, ...additionalOrderings: Ordering[]): Pipeline;
10021002
// (undocumented)
10031003
sort(options: SortStageOptions): Pipeline;
1004+
// Warning: (ae-incompatible-release-tags) The symbol "stages" is marked as @public, but its signature references "Stage" which is marked as @beta
1005+
//
10041006
// (undocumented)
10051007
stages: any;
10061008
// (undocumented)
@@ -1013,8 +1015,6 @@ export class Pipeline {
10131015
unnest(selectable: Selectable, indexField?: string): Pipeline;
10141016
// (undocumented)
10151017
unnest(options: UnnestStageOptions): Pipeline;
1016-
// (undocumented)
1017-
userDataReader: any;
10181018
// Warning: (ae-incompatible-release-tags) The symbol "where" is marked as @public, but its signature references "BooleanExpression" which is marked as @beta
10191019
//
10201020
// (undocumented)
@@ -1032,18 +1032,42 @@ export interface PipelineExecuteOptions {
10321032
};
10331033
}
10341034

1035-
// Warning: (ae-forgotten-export) The symbol "DocumentData" needs to be exported by the entry point pipelines.d.ts
1036-
//
10371035
// @beta
1038-
export class PipelineResult<AppModelType = DocumentData> {
1036+
export class PipelineResult {
1037+
get createTime(): Timestamp | undefined;
10391038
/* Excluded from this release type: _ref */
10401039
/* Excluded from this release type: _fields */
10411040
/* Excluded from this release type: __constructor */
1042-
get createTime(): Timestamp | undefined;
1043-
data(): AppModelType;
1041+
/* Excluded from this release type: fromDocument */
1042+
// Warning: (ae-forgotten-export) The symbol "DocumentData" needs to be exported by the entry point pipelines.d.ts
1043+
data(): DocumentData | undefined;
1044+
/* Excluded from this release type: _ref */
1045+
/* Excluded from this release type: _fields */
1046+
/* Excluded from this release type: __constructor */
1047+
/* Excluded from this release type: fromDocument */
10441048
get(fieldPath: string | FieldPath | Field): any;
1049+
/* Excluded from this release type: _ref */
1050+
/* Excluded from this release type: _fields */
1051+
/* Excluded from this release type: __constructor */
1052+
/* Excluded from this release type: fromDocument */
10451053
get id(): string | undefined;
1054+
/* Excluded from this release type: _ref */
1055+
/* Excluded from this release type: _fields */
1056+
/* Excluded from this release type: __constructor */
1057+
/* Excluded from this release type: fromDocument */
1058+
// Warning: (ae-forgotten-export) The symbol "SnapshotMetadata" needs to be exported by the entry point pipelines.d.ts
1059+
//
1060+
// (undocumented)
1061+
readonly metadata?: SnapshotMetadata | undefined;
1062+
/* Excluded from this release type: _ref */
1063+
/* Excluded from this release type: _fields */
1064+
/* Excluded from this release type: __constructor */
1065+
/* Excluded from this release type: fromDocument */
10461066
get ref(): DocumentReference | undefined;
1067+
/* Excluded from this release type: _ref */
1068+
/* Excluded from this release type: _fields */
1069+
/* Excluded from this release type: __constructor */
1070+
/* Excluded from this release type: fromDocument */
10471071
get updateTime(): Timestamp | undefined;
10481072
}
10491073

@@ -1065,7 +1089,7 @@ export class PipelineSource<PipelineType> {
10651089
collection(options: CollectionStageOptions): PipelineType;
10661090
collectionGroup(collectionId: string): PipelineType;
10671091
collectionGroup(options: CollectionGroupStageOptions): PipelineType;
1068-
createFrom(query: Query): Pipeline;
1092+
createFrom(query: Query): PipelineType;
10691093
database(): PipelineType;
10701094
database(options: DatabaseStageOptions): PipelineType;
10711095
documents(docs: Array<string | DocumentReference>): PipelineType;

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ export function execute(
140140
element =>
141141
new PipelineResult(
142142
pipeline._userDataWriter,
143-
element.fields!,
144143
element.key?.path
145144
? new DocumentReference(firestore, null, element.key)
146145
: undefined,
146+
element.fields,
147147
element.createTime?.toTimestamp(),
148148
element.updateTime?.toTimestamp()
149149
)
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import { Firestore } from '../lite-api/database';
2+
import { BooleanExpression, Ordering } from '../lite-api/expressions';
3+
import { isReadableUserData, ReadableUserData } from '../lite-api/pipeline';
4+
import { Limit, Sort, Stage, Where } from '../lite-api/stage';
5+
import { UserDataReader, UserDataSource } from '../lite-api/user_data_reader';
6+
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
7+
import {
8+
Stage as ProtoStage,
9+
StructuredPipeline
10+
} from '../protos/firestore_proto_api';
11+
import { JsonProtoSerializer } from '../remote/serializer';
12+
13+
/**
14+
* @beta
15+
*
16+
* The RealtimePipeline class provides a flexible and expressive framework for building complex data
17+
* transformation and query pipelines that can be used with Firestore's real-time and offline capabilities.
18+
*
19+
* A RealtimePipeline takes data sources, such as Firestore collections or collection groups, and applies
20+
* a series of stages that are chained together. Each stage takes the output from the previous stage
21+
* (or the data source) and produces an output for the next stage (or as the final output of the
22+
* pipeline).
23+
*
24+
* Expressions can be used within each stage to filter and transform data through the stage.
25+
*
26+
* NOTE: Both the initial and subsequent snapshots for RealtimePipeline take the consideration of the SDK's cache.
27+
* They might include results that have not been synchronized with the server yet, and wait for subsequent snapshots
28+
* to reflect the latest server state, this is the same as classic Firestore {@link Query}.
29+
* This behavior is different from the {@link Pipeline} class, which does not take the consideration of the SDK's cache.
30+
*
31+
* Usage Examples:
32+
*
33+
* ```typescript
34+
* const db: Firestore; // Assumes a valid firestore instance.
35+
*
36+
* // Example 1: Listen to books published after 1980
37+
* const unsubscribe = onRealtimePipelineSnapshot(db.realtimePipeline()
38+
* .collection("books")
39+
* .where(field("published").gt(1980)),
40+
* (snapshot) => {
41+
* // Handle the snapshot
42+
* }
43+
* );
44+
* ```
45+
*/
46+
// TODO(pipeline): Add more examples to showcase functions
47+
export class RealtimePipeline {
48+
/**
49+
* @internal
50+
* @private
51+
* @param _db
52+
* @param userDataReader
53+
* @param _userDataWriter
54+
* @param _documentReferenceFactory
55+
* @param stages
56+
*/
57+
constructor(
58+
/**
59+
* @internal
60+
* @private
61+
*/
62+
public _db: Firestore,
63+
/**
64+
* @internal
65+
* @private
66+
*/
67+
readonly userDataReader: UserDataReader,
68+
/**
69+
* @internal
70+
* @private
71+
*/
72+
public _userDataWriter: AbstractUserDataWriter,
73+
readonly stages: Stage[]
74+
) {}
75+
76+
/**
77+
* Reads user data for each expression in the expressionMap.
78+
* @param name Name of the calling function. Used for error messages when invalid user data is encountered.
79+
* @param expressionMap
80+
* @return the expressionMap argument.
81+
* @private
82+
* @internal
83+
*/
84+
protected readUserData<
85+
T extends
86+
| Map<string, ReadableUserData>
87+
| ReadableUserData[]
88+
| ReadableUserData
89+
>(name: string, expressionMap: T): T {
90+
const context = this.userDataReader.createContext(
91+
UserDataSource.Argument,
92+
name
93+
);
94+
if (isReadableUserData(expressionMap)) {
95+
expressionMap._readUserData(context);
96+
} else if (Array.isArray(expressionMap)) {
97+
expressionMap.forEach(readableData =>
98+
readableData._readUserData(context)
99+
);
100+
} else {
101+
expressionMap.forEach(expr => expr._readUserData(context));
102+
}
103+
return expressionMap;
104+
}
105+
106+
where(condition: BooleanExpression): RealtimePipeline {
107+
const copy = this.stages.map(s => s);
108+
this.readUserData('where', condition);
109+
copy.push(new Where(condition, {}));
110+
return new RealtimePipeline(
111+
this._db,
112+
this.userDataReader,
113+
this._userDataWriter,
114+
copy
115+
);
116+
}
117+
118+
limit(limit: number): RealtimePipeline {
119+
const copy = this.stages.map(s => s);
120+
copy.push(new Limit(limit, {}));
121+
return new RealtimePipeline(
122+
this._db,
123+
this.userDataReader,
124+
this._userDataWriter,
125+
copy
126+
);
127+
}
128+
129+
sort(...orderings: Ordering[]): RealtimePipeline;
130+
sort(options: { orderings: Ordering[] }): RealtimePipeline;
131+
sort(
132+
optionsOrOrderings:
133+
| Ordering
134+
| {
135+
orderings: Ordering[];
136+
},
137+
...rest: Ordering[]
138+
): RealtimePipeline {
139+
const copy = this.stages.map(s => s);
140+
// Option object
141+
if ('orderings' in optionsOrOrderings) {
142+
copy.push(
143+
new Sort(this.readUserData('sort', optionsOrOrderings.orderings), {})
144+
);
145+
} else {
146+
// Ordering object
147+
copy.push(
148+
new Sort(this.readUserData('sort', [optionsOrOrderings, ...rest]), {})
149+
);
150+
}
151+
152+
return new RealtimePipeline(
153+
this._db,
154+
this.userDataReader,
155+
this._userDataWriter,
156+
copy
157+
);
158+
}
159+
160+
/**
161+
* @internal
162+
* @private
163+
*/
164+
_toStructuredPipeline(
165+
jsonProtoSerializer: JsonProtoSerializer
166+
): StructuredPipeline {
167+
const stages: ProtoStage[] = this.stages.map(stage =>
168+
stage._toProto(jsonProtoSerializer)
169+
);
170+
return { pipeline: { stages } };
171+
}
172+
}

0 commit comments

Comments
 (0)