diff --git a/packages/firestore/src/api/pipeline_impl.ts b/packages/firestore/src/api/pipeline_impl.ts index ba6e08105bb..c2183ca7e3a 100644 --- a/packages/firestore/src/api/pipeline_impl.ts +++ b/packages/firestore/src/api/pipeline_impl.ts @@ -17,11 +17,22 @@ import { Pipeline } from '../api/pipeline'; import { firestoreClientExecutePipeline } from '../core/firestore_client'; +import { + StructuredPipeline, + StructuredPipelineOptions +} from '../core/structured_pipeline'; import { Pipeline as LitePipeline } from '../lite-api/pipeline'; import { PipelineResult, PipelineSnapshot } from '../lite-api/pipeline-result'; import { PipelineSource } from '../lite-api/pipeline-source'; +import { PipelineOptions } from '../lite-api/pipeline_settings'; import { Stage } from '../lite-api/stage'; -import { newUserDataReader } from '../lite-api/user_data_reader'; +import { + newUserDataReader, + parseData, + UserDataReader, + UserDataSource +} from '../lite-api/user_data_reader'; +import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api'; import { cast } from '../util/input_validation'; import { ensureFirestoreConfigured, Firestore } from './database'; @@ -68,35 +79,68 @@ declare module './database' { * @param pipeline The pipeline to execute. * @return A Promise representing the asynchronous pipeline execution. */ -export function execute(pipeline: LitePipeline): Promise { +export function execute(pipeline: LitePipeline): Promise; +export function execute(options: PipelineOptions): Promise; +export function execute( + pipelineOrOptions: LitePipeline | PipelineOptions +): Promise { + const pipeline: LitePipeline = + pipelineOrOptions instanceof LitePipeline + ? pipelineOrOptions + : pipelineOrOptions.pipeline; + const options: StructuredPipelineOptions = !( + pipelineOrOptions instanceof LitePipeline + ) + ? pipelineOrOptions + : {}; + const genericOptions: { [name: string]: unknown } = + (pipelineOrOptions as PipelineOptions).genericOptions ?? {}; + const firestore = cast(pipeline._db, Firestore); const client = ensureFirestoreConfigured(firestore); - return firestoreClientExecutePipeline(client, pipeline).then(result => { - // Get the execution time from the first result. - // firestoreClientExecutePipeline returns at least one PipelineStreamElement - // even if the returned document set is empty. - const executionTime = - result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined; - const docs = result - // Currently ignore any response from ExecutePipeline that does - // not contain any document data in the `fields` property. - .filter(element => !!element.fields) - .map( - element => - new PipelineResult( - pipeline._userDataWriter, - element.key?.path - ? new DocumentReference(firestore, null, element.key) - : undefined, - element.fields, - element.createTime?.toTimestamp(), - element.updateTime?.toTimestamp() - ) - ); + const udr = new UserDataReader( + firestore._databaseId, + /* ignoreUndefinedProperties */ true + ); + const context = udr.createContext(UserDataSource.Argument, 'execute'); + const optionsOverride: ApiClientObjectMap = + parseData(genericOptions, context)?.mapValue?.fields ?? {}; - return new PipelineSnapshot(pipeline, docs, executionTime); - }); + const structuredPipeline: StructuredPipeline = new StructuredPipeline( + pipeline, + options, + optionsOverride + ); + + return firestoreClientExecutePipeline(client, structuredPipeline).then( + result => { + // Get the execution time from the first result. + // firestoreClientExecutePipeline returns at least one PipelineStreamElement + // even if the returned document set is empty. + const executionTime = + result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined; + + const docs = result + // Currently ignore any response from ExecutePipeline that does + // not contain any document data in the `fields` property. + .filter(element => !!element.fields) + .map( + element => + new PipelineResult( + pipeline._userDataWriter, + element.key?.path + ? new DocumentReference(firestore, null, element.key) + : undefined, + element.fields, + element.createTime?.toTimestamp(), + element.updateTime?.toTimestamp() + ) + ); + + return new PipelineSnapshot(pipeline, docs, executionTime); + } + ); } // Augment the Firestore class with the pipeline() factory method diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index bb0771d2335..0df5a692128 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -23,7 +23,6 @@ import { CredentialsProvider } from '../api/credentials'; import { User } from '../auth/user'; -import { Pipeline } from '../lite-api/pipeline'; import { LocalStore } from '../local/local_store'; import { localStoreConfigureFieldIndexes, @@ -87,6 +86,7 @@ import { removeSnapshotsInSyncListener } from './event_manager'; import { newQueryForPath, Query } from './query'; +import { StructuredPipeline } from './structured_pipeline'; import { SyncEngine } from './sync_engine'; import { syncEngineListen, @@ -557,7 +557,7 @@ export function firestoreClientRunAggregateQuery( export function firestoreClientExecutePipeline( client: FirestoreClient, - pipeline: Pipeline + pipeline: StructuredPipeline ): Promise { const deferred = new Deferred(); diff --git a/packages/firestore/src/core/structured_pipeline.ts b/packages/firestore/src/core/structured_pipeline.ts new file mode 100644 index 00000000000..ef913512275 --- /dev/null +++ b/packages/firestore/src/core/structured_pipeline.ts @@ -0,0 +1,80 @@ +/** + * @license + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ObjectValue } from '../model/object_value'; +import { FieldPath } from '../model/path'; +import { + StructuredPipeline as StructuredPipelineProto, + Pipeline as PipelineProto, + ApiClientObjectMap, + Value +} from '../protos/firestore_proto_api'; +import { JsonProtoSerializer, ProtoSerializable } from '../remote/serializer'; +import { mapToArray } from '../util/obj'; + +export interface StructuredPipelineOptions { + indexMode?: 'recommended'; +} + +export class StructuredPipeline + implements ProtoSerializable +{ + constructor( + private pipeline: ProtoSerializable, + private options: StructuredPipelineOptions, + private optionsOverride: ApiClientObjectMap + ) {} + + /** + * @private + * @internal for testing + */ + _getKnownOptions(): ObjectValue { + const options: ObjectValue = ObjectValue.empty(); + + // SERIALIZE KNOWN OPTIONS + if (typeof this.options.indexMode === 'string') { + options.set(FieldPath.fromServerFormat('index_mode'), { + stringValue: this.options.indexMode + }); + } + + return options; + } + + private getOptionsProto(): ApiClientObjectMap { + const options: ObjectValue = this._getKnownOptions(); + + // APPLY OPTIONS OVERRIDES + const optionsMap = new Map( + mapToArray(this.optionsOverride, (value, key) => [ + FieldPath.fromServerFormat(key), + value + ]) + ); + options.setAll(optionsMap); + + return options.value.mapValue.fields ?? {}; + } + + _toProto(serializer: JsonProtoSerializer): StructuredPipelineProto { + return { + pipeline: this.pipeline._toProto(serializer), + options: this.getOptionsProto() + }; + } +} diff --git a/packages/firestore/src/lite-api/pipeline_settings.ts b/packages/firestore/src/lite-api/pipeline_settings.ts new file mode 100644 index 00000000000..e23cc33b69a --- /dev/null +++ b/packages/firestore/src/lite-api/pipeline_settings.ts @@ -0,0 +1,61 @@ +import type { Pipeline } from './pipeline'; + +/** + * Options defining how a Pipeline is evaluated. + */ +export interface PipelineOptions { + /** + * Pipeline to be evaluated. + */ + pipeline: Pipeline; + + /** + * Specify the index mode. + */ + indexMode?: 'recommended'; + + /** + * An escape hatch to set options not known at SDK build time. These values + * will be passed directly to the Firestore backend and not used by the SDK. + * + * The generic option name will be used as provided. And must match the name + * format used by the backend (hint: use a snake_case_name). + * + * Generic option values can be any type supported + * by Firestore (for example: string, boolean, number, map, …). Value types + * not known to the SDK will be rejected. + * + * Values specified in genericOptions will take precedence over any options + * with the same name set by the SDK. + * + * Override the `example_option`: + * ``` + * execute({ + * pipeline: myPipeline, + * genericOptions: { + * // Override `example_option`. This will not + * // merge with the existing `example_option` object. + * "example_option": { + * foo: "bar" + * } + * } + * } + * ``` + * + * `genericOptions` supports dot notation, if you want to override + * a nested option. + * ``` + * execute({ + * pipeline: myPipeline, + * genericOptions: { + * // Override `example_option.foo` and do not override + * // any other properties of `example_option`. + * "example_option.foo": "bar" + * } + * } + * ``` + */ + genericOptions?: { + [name: string]: unknown; + }; +} diff --git a/packages/firestore/src/remote/datastore.ts b/packages/firestore/src/remote/datastore.ts index 32666feeea1..793907cc420 100644 --- a/packages/firestore/src/remote/datastore.ts +++ b/packages/firestore/src/remote/datastore.ts @@ -20,7 +20,7 @@ import { User } from '../auth/user'; import { Aggregate } from '../core/aggregate'; import { DatabaseId } from '../core/database_info'; import { queryToAggregateTarget, Query, queryToTarget } from '../core/query'; -import { Pipeline } from '../lite-api/pipeline'; +import { StructuredPipeline } from '../core/structured_pipeline'; import { Document } from '../model/document'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; @@ -242,14 +242,12 @@ export async function invokeBatchGetDocumentsRpc( export async function invokeExecutePipeline( datastore: Datastore, - pipeline: Pipeline + structuredPipeline: StructuredPipeline ): Promise { const datastoreImpl = debugCast(datastore, DatastoreImpl); const executePipelineRequest: ProtoExecutePipelineRequest = { database: getEncodedDatabaseId(datastoreImpl.serializer), - structuredPipeline: { - pipeline: pipeline._toProto(datastoreImpl.serializer) - } + structuredPipeline: structuredPipeline._toProto(datastoreImpl.serializer) }; const response = await datastoreImpl.invokeStreamingRPC< diff --git a/packages/firestore/test/unit/api/pipeline_impl.test.ts b/packages/firestore/test/unit/api/pipeline_impl.test.ts new file mode 100644 index 00000000000..c04d2cac6d5 --- /dev/null +++ b/packages/firestore/test/unit/api/pipeline_impl.test.ts @@ -0,0 +1,199 @@ +/** + * @license + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { Timestamp } from '../../../src'; +import { Firestore } from '../../../src/api/database'; +import { execute } from '../../../src/api/pipeline_impl'; +import { + MemoryOfflineComponentProvider, + OnlineComponentProvider +} from '../../../src/core/component_provider'; +import { + ExecutePipelineRequest as ProtoExecutePipelineRequest, + ExecutePipelineResponse as ProtoExecutePipelineResponse +} from '../../../src/protos/firestore_proto_api'; +import { newTestFirestore } from '../../util/api_helpers'; + +const FIRST_CALL = 0; +const EXECUTE_PIPELINE_REQUEST = 3; + +function fakePipelineResponse( + firestore: Firestore, + response?: ProtoExecutePipelineResponse[] +): sinon.SinonSpy { + response = response ?? [ + { + executionTime: Timestamp.now().toDate().toISOString(), + results: [] + } + ]; + const fake = sinon.fake.resolves(response); + + firestore._componentsProvider = { + _offline: { + build: () => new MemoryOfflineComponentProvider() + }, + _online: { + build: () => { + const provider = new OnlineComponentProvider(); + const ogCreateDatastore = provider.createDatastore.bind(provider); + provider.createDatastore = config => { + const datastore = ogCreateDatastore(config); + // @ts-ignore + datastore.invokeStreamingRPC = fake; + return datastore; + }; + return provider; + } + } + }; + + return fake; +} + +describe('execute(Pipeline|PipelineOptions)', () => { + it('returns execution time with empty results', async () => { + const firestore = newTestFirestore(); + + const executeTime = Timestamp.now(); + const spy = fakePipelineResponse(firestore, [ + { + executionTime: executeTime.toDate().toISOString(), + results: [] + } + ]); + + const pipelineSnapshot = await execute( + firestore.pipeline().collection('foo') + ); + + expect(pipelineSnapshot.results.length).to.equal(0); + expect(spy.calledOnce); + + expect(pipelineSnapshot.executionTime.toJSON()).to.deep.equal( + executeTime.toJSON() + ); + }); + + it('serializes the pipeline', async () => { + const firestore = newTestFirestore(); + const spy = fakePipelineResponse(firestore); + + await execute({ + pipeline: firestore.pipeline().collection('foo') + }); + + const executePipelineRequest: ProtoExecutePipelineRequest = { + database: 'projects/new-project/databases/(default)', + structuredPipeline: { + 'options': {}, + 'pipeline': { + 'stages': [ + { + 'args': [ + { + 'referenceValue': '/foo' + } + ], + 'name': 'collection' + } + ] + } + } + }; + expect(spy.args[FIRST_CALL][EXECUTE_PIPELINE_REQUEST]).to.deep.equal( + executePipelineRequest + ); + }); + + it('serializes the pipeline options', async () => { + const firestore = newTestFirestore(); + const spy = fakePipelineResponse(firestore); + + await execute({ + pipeline: firestore.pipeline().collection('foo'), + indexMode: 'recommended' + }); + + const executePipelineRequest: ProtoExecutePipelineRequest = { + database: 'projects/new-project/databases/(default)', + structuredPipeline: { + 'options': { + 'index_mode': { + 'stringValue': 'recommended' + } + }, + 'pipeline': { + 'stages': [ + { + 'args': [ + { + 'referenceValue': '/foo' + } + ], + 'name': 'collection' + } + ] + } + } + }; + expect(spy.args[FIRST_CALL][EXECUTE_PIPELINE_REQUEST]).to.deep.equal( + executePipelineRequest + ); + }); + + it('serializes the pipeline generic options', async () => { + const firestore = newTestFirestore(); + const spy = fakePipelineResponse(firestore); + + await execute({ + pipeline: firestore.pipeline().collection('foo'), + genericOptions: { + 'foo': 'bar' + } + }); + + const executePipelineRequest: ProtoExecutePipelineRequest = { + database: 'projects/new-project/databases/(default)', + structuredPipeline: { + 'options': { + 'foo': { + 'stringValue': 'bar' + } + }, + 'pipeline': { + 'stages': [ + { + 'args': [ + { + 'referenceValue': '/foo' + } + ], + 'name': 'collection' + } + ] + } + } + }; + expect(spy.args[FIRST_CALL][EXECUTE_PIPELINE_REQUEST]).to.deep.equal( + executePipelineRequest + ); + }); +}); diff --git a/packages/firestore/test/unit/core/structured_pipeline.test.ts b/packages/firestore/test/unit/core/structured_pipeline.test.ts new file mode 100644 index 00000000000..920108df710 --- /dev/null +++ b/packages/firestore/test/unit/core/structured_pipeline.test.ts @@ -0,0 +1,331 @@ +/** + * @license + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { DatabaseId } from '../../../src/core/database_info'; +import { StructuredPipeline } from '../../../src/core/structured_pipeline'; +import { ObjectValue } from '../../../src/model/object_value'; +import { Pipeline as PipelineProto } from '../../../src/protos/firestore_proto_api'; +import { + JsonProtoSerializer, + ProtoSerializable +} from '../../../src/remote/serializer'; + +describe('StructuredPipeline', () => { + it('should serialize the pipeline argument', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + const structuredPipeline = new StructuredPipeline(pipeline, {}, {}); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: {} + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); + + it('should support known options', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + const structuredPipeline = new StructuredPipeline( + pipeline, + { + indexMode: 'recommended' + }, + {} + ); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: { + 'index_mode': { + stringValue: 'recommended' + } + } + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); + + it('should support unknown options', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + const structuredPipeline = new StructuredPipeline( + pipeline, + {}, + { + 'foo_bar': { stringValue: 'baz' } + } + ); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: { + 'foo_bar': { + stringValue: 'baz' + } + } + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); + + it('should support unknown nested options', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + const structuredPipeline = new StructuredPipeline( + pipeline, + {}, + { + 'foo.bar': { stringValue: 'baz' } + } + ); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: { + 'foo': { + mapValue: { + fields: { + 'bar': { stringValue: 'baz' } + } + } + } + } + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); + + it('should support options override', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + const structuredPipeline = new StructuredPipeline( + pipeline, + { + indexMode: 'recommended' + }, + { + 'index_mode': { stringValue: 'baz' } + } + ); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: { + 'index_mode': { + stringValue: 'baz' + } + } + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); + + it('should support options override of nested field', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + + const structuredPipeline = new StructuredPipeline( + pipeline, + {}, + { + 'foo.bar': { integerValue: 123 } + } + ); + + // Fake known options with a nested {foo: {bar: "baz"}} + structuredPipeline._getKnownOptions = sinon.fake.returns( + new ObjectValue({ + mapValue: { + fields: { + 'foo': { + mapValue: { + fields: { + 'bar': { stringValue: 'baz' }, + 'waldo': { booleanValue: true } + } + } + } + } + } + }) + ); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: { + 'foo': { + mapValue: { + fields: { + 'bar': { + integerValue: 123 + }, + 'waldo': { + booleanValue: true + } + } + } + } + } + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); + + it('will replace a nested object if given a new object', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + + const structuredPipeline = new StructuredPipeline( + pipeline, + {}, + { + 'foo': { mapValue: { fields: { bar: { integerValue: 123 } } } } + } + ); + + // Fake known options with a nested {foo: {bar: "baz"}} + structuredPipeline._getKnownOptions = sinon.fake.returns( + new ObjectValue({ + mapValue: { + fields: { + 'foo': { + mapValue: { + fields: { + 'bar': { stringValue: 'baz' }, + 'waldo': { booleanValue: true } + } + } + } + } + } + }) + ); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: { + 'foo': { + mapValue: { + fields: { + 'bar': { + integerValue: 123 + } + } + } + } + } + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); + + it('will replace a top level property that is not an object if given a nested field with dot notation', () => { + const pipeline: ProtoSerializable = { + _toProto: sinon.fake.returns({} as PipelineProto) + }; + + const structuredPipeline = new StructuredPipeline( + pipeline, + {}, + { + 'foo': { + mapValue: { + fields: { + 'bar': { stringValue: '123' }, + 'waldo': { booleanValue: true } + } + } + } + } + ); + + // Fake known options with a nested {foo: {bar: "baz"}} + structuredPipeline._getKnownOptions = sinon.fake.returns( + new ObjectValue({ + mapValue: { + fields: { + 'foo': { integerValue: 123 } + } + } + }) + ); + + const proto = structuredPipeline._toProto( + new JsonProtoSerializer(DatabaseId.empty(), false) + ); + + expect(proto).to.deep.equal({ + pipeline: {}, + options: { + 'foo': { + mapValue: { + fields: { + 'bar': { + stringValue: '123' + }, + 'waldo': { + booleanValue: true + } + } + } + } + } + }); + + expect((pipeline._toProto as sinon.SinonSpy).calledOnce).to.be.true; + }); +});