Skip to content

Commit d66ddb0

Browse files
committed
Add PipelineOptions and StructuredPipeline
1 parent bb8fc27 commit d66ddb0

File tree

6 files changed

+549
-32
lines changed

6 files changed

+549
-32
lines changed

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,23 @@ import { Pipeline as LitePipeline } from '../lite-api/pipeline';
2121
import { PipelineResult, PipelineSnapshot } from '../lite-api/pipeline-result';
2222
import { PipelineSource } from '../lite-api/pipeline-source';
2323
import { Stage } from '../lite-api/stage';
24-
import { newUserDataReader } from '../lite-api/user_data_reader';
24+
import {
25+
newUserDataReader,
26+
parseData,
27+
UserDataReader,
28+
UserDataSource
29+
} from '../lite-api/user_data_reader';
2530
import { cast } from '../util/input_validation';
2631

2732
import { ensureFirestoreConfigured, Firestore } from './database';
2833
import { DocumentReference } from './reference';
2934
import { ExpUserDataWriter } from './user_data_writer';
35+
import { PipelineOptions } from '../lite-api/pipeline_settings';
36+
import {
37+
StructuredPipeline,
38+
StructuredPipelineOptions
39+
} from '../core/structured_pipeline';
40+
import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api';
3041

3142
declare module './database' {
3243
interface Firestore {
@@ -68,35 +79,68 @@ declare module './database' {
6879
* @param pipeline The pipeline to execute.
6980
* @return A Promise representing the asynchronous pipeline execution.
7081
*/
71-
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot> {
82+
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot>;
83+
export function execute(options: PipelineOptions): Promise<PipelineSnapshot>;
84+
export function execute(
85+
pipelineOrOptions: LitePipeline | PipelineOptions
86+
): Promise<PipelineSnapshot> {
87+
let pipeline: LitePipeline =
88+
pipelineOrOptions instanceof LitePipeline
89+
? pipelineOrOptions
90+
: pipelineOrOptions.pipeline;
91+
let options: StructuredPipelineOptions = !(
92+
pipelineOrOptions instanceof LitePipeline
93+
)
94+
? pipelineOrOptions
95+
: {};
96+
let genericOptions: { [name: string]: unknown } =
97+
(pipelineOrOptions as PipelineOptions).genericOptions ?? {};
98+
7299
const firestore = cast(pipeline._db, Firestore);
73100
const client = ensureFirestoreConfigured(firestore);
74-
return firestoreClientExecutePipeline(client, pipeline).then(result => {
75-
// Get the execution time from the first result.
76-
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
77-
// even if the returned document set is empty.
78-
const executionTime =
79-
result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined;
80101

81-
const docs = result
82-
// Currently ignore any response from ExecutePipeline that does
83-
// not contain any document data in the `fields` property.
84-
.filter(element => !!element.fields)
85-
.map(
86-
element =>
87-
new PipelineResult(
88-
pipeline._userDataWriter,
89-
element.key?.path
90-
? new DocumentReference(firestore, null, element.key)
91-
: undefined,
92-
element.fields,
93-
element.createTime?.toTimestamp(),
94-
element.updateTime?.toTimestamp()
95-
)
96-
);
102+
const udr = new UserDataReader(
103+
firestore._databaseId,
104+
/* ignoreUndefinedProperties */ true
105+
);
106+
const context = udr.createContext(UserDataSource.Argument, 'execute');
107+
const optionsOverride: ApiClientObjectMap<Value> =
108+
parseData(genericOptions, context)?.mapValue?.fields ?? {};
97109

98-
return new PipelineSnapshot(pipeline, docs, executionTime);
99-
});
110+
let structuredPipeline: StructuredPipeline = new StructuredPipeline(
111+
pipeline,
112+
options,
113+
optionsOverride
114+
);
115+
116+
return firestoreClientExecutePipeline(client, structuredPipeline).then(
117+
result => {
118+
// Get the execution time from the first result.
119+
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
120+
// even if the returned document set is empty.
121+
const executionTime =
122+
result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined;
123+
124+
const docs = result
125+
// Currently ignore any response from ExecutePipeline that does
126+
// not contain any document data in the `fields` property.
127+
.filter(element => !!element.fields)
128+
.map(
129+
element =>
130+
new PipelineResult(
131+
pipeline._userDataWriter,
132+
element.key?.path
133+
? new DocumentReference(firestore, null, element.key)
134+
: undefined,
135+
element.fields,
136+
element.createTime?.toTimestamp(),
137+
element.updateTime?.toTimestamp()
138+
)
139+
);
140+
141+
return new PipelineSnapshot(pipeline, docs, executionTime);
142+
}
143+
);
100144
}
101145

102146
// Augment the Firestore class with the pipeline() factory method

packages/firestore/src/core/firestore_client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ import { TransactionOptions } from './transaction_options';
102102
import { TransactionRunner } from './transaction_runner';
103103
import { View } from './view';
104104
import { ViewSnapshot } from './view_snapshot';
105+
import { StructuredPipeline } from './structured_pipeline';
105106

106107
const LOG_TAG = 'FirestoreClient';
107108
export const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
@@ -557,7 +558,7 @@ export function firestoreClientRunAggregateQuery(
557558

558559
export function firestoreClientExecutePipeline(
559560
client: FirestoreClient,
560-
pipeline: Pipeline
561+
pipeline: StructuredPipeline
561562
): Promise<PipelineStreamElement[]> {
562563
const deferred = new Deferred<PipelineStreamElement[]>();
563564

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* @license
3+
* Copyright 2025 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import {
19+
StructuredPipeline as StructuredPipelineProto,
20+
Pipeline as PipelineProto,
21+
ApiClientObjectMap,
22+
Value
23+
} from '../protos/firestore_proto_api';
24+
25+
import { JsonProtoSerializer, ProtoSerializable } from '../remote/serializer';
26+
import { ObjectValue } from '../model/object_value';
27+
import { FieldPath } from '../model/path';
28+
import { mapToArray } from '../util/obj';
29+
30+
export interface StructuredPipelineOptions {
31+
indexMode?: 'recommended';
32+
}
33+
34+
export class StructuredPipeline
35+
implements ProtoSerializable<StructuredPipelineProto>
36+
{
37+
constructor(
38+
private pipeline: ProtoSerializable<PipelineProto>,
39+
private options: StructuredPipelineOptions,
40+
private optionsOverride: ApiClientObjectMap<Value>
41+
) {}
42+
43+
/**
44+
* @private
45+
* @internal for testing
46+
*/
47+
_getKnownOptions(): ObjectValue {
48+
const options: ObjectValue = ObjectValue.empty();
49+
50+
/** SERIALIZE KNOWN OPTIONS **/
51+
if (typeof this.options.indexMode === 'string') {
52+
options.set(FieldPath.fromServerFormat('index_mode'), {
53+
stringValue: this.options.indexMode
54+
});
55+
}
56+
57+
return options;
58+
}
59+
60+
private getOptionsProto(): ApiClientObjectMap<Value> {
61+
const options: ObjectValue = this._getKnownOptions();
62+
63+
/** APPLY OPTIONS OVERRIDES **/
64+
const optionsMap = new Map(
65+
mapToArray(this.optionsOverride, (value, key) => [
66+
FieldPath.fromServerFormat(key),
67+
value
68+
])
69+
);
70+
options.setAll(optionsMap);
71+
72+
return options.value.mapValue.fields ?? {};
73+
}
74+
75+
_toProto(serializer: JsonProtoSerializer): StructuredPipelineProto {
76+
return {
77+
pipeline: this.pipeline._toProto(serializer),
78+
options: this.getOptionsProto()
79+
};
80+
}
81+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import type { Pipeline } from './pipeline';
2+
3+
/**
4+
* Options defining how a Pipeline is evaluated.
5+
*/
6+
export interface PipelineOptions {
7+
/**
8+
* Pipeline to be evaluated.
9+
*/
10+
pipeline: Pipeline;
11+
12+
/**
13+
* Specify the index mode.
14+
*/
15+
indexMode?: 'recommended';
16+
17+
/**
18+
* An escape hatch to set options not known at SDK build time. These values
19+
* will be passed directly to the Firestore backend and not used by the SDK.
20+
*
21+
* The generic option name will be used as provided. And must match the name
22+
* format used by the backend (hint: use a snake_case_name).
23+
*
24+
* Generic option values can be any type supported
25+
* by Firestore (for example: string, boolean, number, map, …). Value types
26+
* not known to the SDK will be rejected.
27+
*
28+
* Values specified in genericOptions will take precedence over any options
29+
* with the same name set by the SDK.
30+
*
31+
* Override the `example_option`:
32+
* ```
33+
* execute({
34+
* pipeline: myPipeline,
35+
* genericOptions: {
36+
* // Override `example_option`. This will not
37+
* // merge with the existing `example_option` object.
38+
* "example_option": {
39+
* foo: "bar"
40+
* }
41+
* }
42+
* }
43+
* ```
44+
*
45+
* `genericOptions` supports dot notation, if you want to override
46+
* a nested option.
47+
* ```
48+
* execute({
49+
* pipeline: myPipeline,
50+
* genericOptions: {
51+
* // Override `example_option.foo` and do not override
52+
* // any other properties of `example_option`.
53+
* "example_option.foo": "bar"
54+
* }
55+
* }
56+
* ```
57+
*/
58+
genericOptions?: {
59+
[name: string]: unknown;
60+
};
61+
}

packages/firestore/src/remote/datastore.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { User } from '../auth/user';
2020
import { Aggregate } from '../core/aggregate';
2121
import { DatabaseId } from '../core/database_info';
2222
import { queryToAggregateTarget, Query, queryToTarget } from '../core/query';
23-
import { Pipeline } from '../lite-api/pipeline';
23+
import { StructuredPipeline } from '../core/structured_pipeline';
2424
import { Document } from '../model/document';
2525
import { DocumentKey } from '../model/document_key';
2626
import { Mutation } from '../model/mutation';
@@ -242,14 +242,12 @@ export async function invokeBatchGetDocumentsRpc(
242242

243243
export async function invokeExecutePipeline(
244244
datastore: Datastore,
245-
pipeline: Pipeline
245+
structuredPipeline: StructuredPipeline
246246
): Promise<PipelineStreamElement[]> {
247247
const datastoreImpl = debugCast(datastore, DatastoreImpl);
248248
const executePipelineRequest: ProtoExecutePipelineRequest = {
249249
database: getEncodedDatabaseId(datastoreImpl.serializer),
250-
structuredPipeline: {
251-
pipeline: pipeline._toProto(datastoreImpl.serializer)
252-
}
250+
structuredPipeline: structuredPipeline._toProto(datastoreImpl.serializer)
253251
};
254252

255253
const response = await datastoreImpl.invokeStreamingRPC<

0 commit comments

Comments
 (0)