Skip to content

Commit 281c441

Browse files
committed
Merge branch 'markduckworth/pipeline-options' into markduckworth/ppl-node-backport
2 parents b3d8c75 + 45688b0 commit 281c441

File tree

7 files changed

+746
-33
lines changed

7 files changed

+746
-33
lines changed

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,22 @@
1717

1818
import { Pipeline } from '../api/pipeline';
1919
import { firestoreClientExecutePipeline } from '../core/firestore_client';
20+
import {
21+
StructuredPipeline,
22+
StructuredPipelineOptions
23+
} from '../core/structured_pipeline';
2024
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
2125
import { PipelineResult, PipelineSnapshot } from '../lite-api/pipeline-result';
2226
import { PipelineSource } from '../lite-api/pipeline-source';
27+
import { PipelineOptions } from '../lite-api/pipeline_settings';
2328
import { Stage } from '../lite-api/stage';
24-
import { newUserDataReader } from '../lite-api/user_data_reader';
29+
import {
30+
newUserDataReader,
31+
parseData,
32+
UserDataReader,
33+
UserDataSource
34+
} from '../lite-api/user_data_reader';
35+
import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api';
2536
import { cast } from '../util/input_validation';
2637

2738
import { ensureFirestoreConfigured, Firestore } from './database';
@@ -68,35 +79,68 @@ declare module './database' {
6879
* @param pipeline The pipeline to execute.
6980
* @return A Promise representing the asynchronous pipeline execution.
7081
*/
71-
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot> {
82+
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot>;
83+
export function execute(options: PipelineOptions): Promise<PipelineSnapshot>;
84+
export function execute(
85+
pipelineOrOptions: LitePipeline | PipelineOptions
86+
): Promise<PipelineSnapshot> {
87+
const pipeline: LitePipeline =
88+
pipelineOrOptions instanceof LitePipeline
89+
? pipelineOrOptions
90+
: pipelineOrOptions.pipeline;
91+
const options: StructuredPipelineOptions = !(
92+
pipelineOrOptions instanceof LitePipeline
93+
)
94+
? pipelineOrOptions
95+
: {};
96+
const genericOptions: { [name: string]: unknown } =
97+
(pipelineOrOptions as PipelineOptions).genericOptions ?? {};
98+
7299
const firestore = cast(pipeline._db, Firestore);
73100
const client = ensureFirestoreConfigured(firestore);
74-
return firestoreClientExecutePipeline(client, pipeline).then(result => {
75-
// Get the execution time from the first result.
76-
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
77-
// even if the returned document set is empty.
78-
const executionTime =
79-
result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined;
80101

81-
const docs = result
82-
// Currently ignore any response from ExecutePipeline that does
83-
// not contain any document data in the `fields` property.
84-
.filter(element => !!element.fields)
85-
.map(
86-
element =>
87-
new PipelineResult(
88-
pipeline._userDataWriter,
89-
element.key?.path
90-
? new DocumentReference(firestore, null, element.key)
91-
: undefined,
92-
element.fields,
93-
element.createTime?.toTimestamp(),
94-
element.updateTime?.toTimestamp()
95-
)
96-
);
102+
const udr = new UserDataReader(
103+
firestore._databaseId,
104+
/* ignoreUndefinedProperties */ true
105+
);
106+
const context = udr.createContext(UserDataSource.Argument, 'execute');
107+
const optionsOverride: ApiClientObjectMap<Value> =
108+
parseData(genericOptions, context)?.mapValue?.fields ?? {};
97109

98-
return new PipelineSnapshot(pipeline, docs, executionTime);
99-
});
110+
const structuredPipeline: StructuredPipeline = new StructuredPipeline(
111+
pipeline,
112+
options,
113+
optionsOverride
114+
);
115+
116+
return firestoreClientExecutePipeline(client, structuredPipeline).then(
117+
result => {
118+
// Get the execution time from the first result.
119+
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
120+
// even if the returned document set is empty.
121+
const executionTime =
122+
result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined;
123+
124+
const docs = result
125+
// Currently ignore any response from ExecutePipeline that does
126+
// not contain any document data in the `fields` property.
127+
.filter(element => !!element.fields)
128+
.map(
129+
element =>
130+
new PipelineResult(
131+
pipeline._userDataWriter,
132+
element.key?.path
133+
? new DocumentReference(firestore, null, element.key)
134+
: undefined,
135+
element.fields,
136+
element.createTime?.toTimestamp(),
137+
element.updateTime?.toTimestamp()
138+
)
139+
);
140+
141+
return new PipelineSnapshot(pipeline, docs, executionTime);
142+
}
143+
);
100144
}
101145

102146
// Augment the Firestore class with the pipeline() factory method

packages/firestore/src/core/firestore_client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import {
2323
CredentialsProvider
2424
} from '../api/credentials';
2525
import { User } from '../auth/user';
26-
import { Pipeline } from '../lite-api/pipeline';
2726
import { LocalStore } from '../local/local_store';
2827
import {
2928
localStoreConfigureFieldIndexes,
@@ -88,6 +87,7 @@ import {
8887
removeSnapshotsInSyncListener
8988
} from './event_manager';
9089
import { newQueryForPath, Query } from './query';
90+
import { StructuredPipeline } from './structured_pipeline';
9191
import { SyncEngine } from './sync_engine';
9292
import {
9393
syncEngineListen,
@@ -570,7 +570,7 @@ export function firestoreClientRunAggregateQuery(
570570

571571
export function firestoreClientExecutePipeline(
572572
client: FirestoreClient,
573-
pipeline: Pipeline
573+
pipeline: StructuredPipeline
574574
): Promise<PipelineStreamElement[]> {
575575
const deferred = new Deferred<PipelineStreamElement[]>();
576576

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* @license
3+
* Copyright 2025 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import { ObjectValue } from '../model/object_value';
19+
import { FieldPath } from '../model/path';
20+
import {
21+
StructuredPipeline as StructuredPipelineProto,
22+
Pipeline as PipelineProto,
23+
ApiClientObjectMap,
24+
Value
25+
} from '../protos/firestore_proto_api';
26+
import { JsonProtoSerializer, ProtoSerializable } from '../remote/serializer';
27+
import { mapToArray } from '../util/obj';
28+
29+
export interface StructuredPipelineOptions {
30+
indexMode?: 'recommended';
31+
}
32+
33+
export class StructuredPipeline
34+
implements ProtoSerializable<StructuredPipelineProto>
35+
{
36+
constructor(
37+
private pipeline: ProtoSerializable<PipelineProto>,
38+
private options: StructuredPipelineOptions,
39+
private optionsOverride: ApiClientObjectMap<Value>
40+
) {}
41+
42+
/**
43+
* @private
44+
* @internal for testing
45+
*/
46+
_getKnownOptions(): ObjectValue {
47+
const options: ObjectValue = ObjectValue.empty();
48+
49+
// SERIALIZE KNOWN OPTIONS
50+
if (typeof this.options.indexMode === 'string') {
51+
options.set(FieldPath.fromServerFormat('index_mode'), {
52+
stringValue: this.options.indexMode
53+
});
54+
}
55+
56+
return options;
57+
}
58+
59+
private getOptionsProto(): ApiClientObjectMap<Value> {
60+
const options: ObjectValue = this._getKnownOptions();
61+
62+
// APPLY OPTIONS OVERRIDES
63+
const optionsMap = new Map(
64+
mapToArray(this.optionsOverride, (value, key) => [
65+
FieldPath.fromServerFormat(key),
66+
value
67+
])
68+
);
69+
options.setAll(optionsMap);
70+
71+
return options.value.mapValue.fields ?? {};
72+
}
73+
74+
_toProto(serializer: JsonProtoSerializer): StructuredPipelineProto {
75+
return {
76+
pipeline: this.pipeline._toProto(serializer),
77+
options: this.getOptionsProto()
78+
};
79+
}
80+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import type { Pipeline } from './pipeline';
2+
3+
/**
4+
* Options defining how a Pipeline is evaluated.
5+
*/
6+
export interface PipelineOptions {
7+
/**
8+
* Pipeline to be evaluated.
9+
*/
10+
pipeline: Pipeline;
11+
12+
/**
13+
* Specify the index mode.
14+
*/
15+
indexMode?: 'recommended';
16+
17+
/**
18+
* An escape hatch to set options not known at SDK build time. These values
19+
* will be passed directly to the Firestore backend and not used by the SDK.
20+
*
21+
* The generic option name will be used as provided. And must match the name
22+
* format used by the backend (hint: use a snake_case_name).
23+
*
24+
* Generic option values can be any type supported
25+
* by Firestore (for example: string, boolean, number, map, …). Value types
26+
* not known to the SDK will be rejected.
27+
*
28+
* Values specified in genericOptions will take precedence over any options
29+
* with the same name set by the SDK.
30+
*
31+
* Override the `example_option`:
32+
* ```
33+
* execute({
34+
* pipeline: myPipeline,
35+
* genericOptions: {
36+
* // Override `example_option`. This will not
37+
* // merge with the existing `example_option` object.
38+
* "example_option": {
39+
* foo: "bar"
40+
* }
41+
* }
42+
* }
43+
* ```
44+
*
45+
* `genericOptions` supports dot notation, if you want to override
46+
* a nested option.
47+
* ```
48+
* execute({
49+
* pipeline: myPipeline,
50+
* genericOptions: {
51+
* // Override `example_option.foo` and do not override
52+
* // any other properties of `example_option`.
53+
* "example_option.foo": "bar"
54+
* }
55+
* }
56+
* ```
57+
*/
58+
genericOptions?: {
59+
[name: string]: unknown;
60+
};
61+
}

packages/firestore/src/remote/datastore.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { User } from '../auth/user';
2020
import { Aggregate } from '../core/aggregate';
2121
import { DatabaseId } from '../core/database_info';
2222
import { queryToAggregateTarget, Query, queryToTarget } from '../core/query';
23-
import { Pipeline } from '../lite-api/pipeline';
23+
import { StructuredPipeline } from '../core/structured_pipeline';
2424
import { Document } from '../model/document';
2525
import { DocumentKey } from '../model/document_key';
2626
import { Mutation } from '../model/mutation';
@@ -244,14 +244,12 @@ export async function invokeBatchGetDocumentsRpc(
244244

245245
export async function invokeExecutePipeline(
246246
datastore: Datastore,
247-
pipeline: Pipeline
247+
structuredPipeline: StructuredPipeline
248248
): Promise<PipelineStreamElement[]> {
249249
const datastoreImpl = debugCast(datastore, DatastoreImpl);
250250
const executePipelineRequest: ProtoExecutePipelineRequest = {
251251
database: getEncodedDatabaseId(datastoreImpl.serializer),
252-
structuredPipeline: {
253-
pipeline: pipeline._toProto(datastoreImpl.serializer)
254-
}
252+
structuredPipeline: structuredPipeline._toProto(datastoreImpl.serializer)
255253
};
256254

257255
const response = await datastoreImpl.invokeStreamingRPC<

0 commit comments

Comments
 (0)