From 63d4e5cc81a02853f90fe66e045c11d7dbf656c8 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 11 Nov 2025 00:11:47 +0000 Subject: [PATCH 01/17] test signing commit From de1c09cc89e5753aeb567e5bb288a6759e488883 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 11 Nov 2025 23:27:21 +0000 Subject: [PATCH 02/17] Support setting project ID and database ID in FirestoreV1 Reads --- .../sdk/io/gcp/firestore/FirestoreV1.java | 321 +++++++++++------- 1 file changed, 205 insertions(+), 116 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java index 446d097a8ed8..60fe3c136467 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java @@ -595,8 +595,11 @@ private ListCollectionIds( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } @Override @@ -613,7 +616,8 @@ public PCollection expand(PCollection input) { @Override public Builder toBuilder() { - return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + return new Builder( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } /** @@ -653,8 +657,16 @@ private Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } @Override @@ -667,9 +679,16 @@ ListCollectionIds buildSafe( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { return new ListCollectionIds( - clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } } } @@ -710,8 +729,11 @@ private ListDocuments( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } @Override @@ -728,7 +750,8 @@ public PCollection expand(PCollection input) { @Override public Builder toBuilder() { - return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + return new Builder( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } /** @@ -768,8 +791,16 @@ private Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } @Override @@ -782,8 +813,16 @@ ListDocuments buildSafe( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - return new ListDocuments(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + return new ListDocuments( + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } } } @@ -824,8 +863,11 @@ private RunQuery( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } @Override @@ -841,7 +883,8 @@ public PCollection expand(PCollection input) @Override public Builder toBuilder() { - return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + return new Builder( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } /** @@ -881,8 +924,16 @@ private Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } @Override @@ -895,8 +946,16 @@ RunQuery buildSafe( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - return new RunQuery(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + return new RunQuery( + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } } } @@ -937,8 +996,11 @@ private BatchGetDocuments( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } @Override @@ -955,7 +1017,8 @@ public PCollection expand( @Override public Builder toBuilder() { - return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + return new Builder( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } /** @@ -995,8 +1058,16 @@ public Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } @Override @@ -1009,9 +1080,16 @@ BatchGetDocuments buildSafe( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { return new BatchGetDocuments( - clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); } } } @@ -1061,8 +1139,11 @@ private PartitionQuery( FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, boolean nameOnlyQuery, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); this.nameOnlyQuery = nameOnlyQuery; } @@ -1106,7 +1187,13 @@ public RunQueryRequest apply(RunQueryRequest input) { @Override public Builder toBuilder() { return new Builder( - clock, firestoreStatefulComponentFactory, rpcQosOptions, nameOnlyQuery, readTime); + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + nameOnlyQuery, + readTime, + projectId, + databaseId); } /** @@ -1149,8 +1236,16 @@ public Builder( FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, boolean nameOnlyQuery, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + readTime, + projectId, + databaseId); this.nameOnlyQuery = nameOnlyQuery; } @@ -1175,9 +1270,17 @@ PartitionQuery buildSafe( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { return new PartitionQuery( - clock, firestoreStatefulComponentFactory, rpcQosOptions, nameOnlyQuery, readTime); + clock, + firestoreStatefulComponentFactory, + rpcQosOptions, + nameOnlyQuery, + readTime, + projectId, + databaseId); } } @@ -1365,18 +1468,13 @@ public static final class BatchWriteWithSummary BatchWriteWithSummary, BatchWriteWithSummary.Builder> { - private final @Nullable String projectId; - private final @Nullable String databaseId; - public BatchWriteWithSummary( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, @Nullable String projectId, @Nullable String databaseId) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions); - this.projectId = projectId; - this.databaseId = databaseId; + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); } @Override @@ -1396,7 +1494,8 @@ public PCollection expand( @Override public Builder toBuilder() { - return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions); + return new Builder( + clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); } /** @@ -1429,9 +1528,6 @@ public static final class Builder BatchWriteWithSummary, BatchWriteWithSummary.Builder> { - private @Nullable String projectId; - private @Nullable String databaseId; - private Builder() { super(); } @@ -1439,39 +1535,15 @@ private Builder() { private Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, - RpcQosOptions rpcQosOptions) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions); - } - - /** Set the GCP project ID to be used by the Firestore client. */ - private Builder setProjectId(@Nullable String projectId) { - this.projectId = projectId; - return this; - } - - /** Set the Firestore database ID (e.g., "(default)"). */ - private Builder setDatabaseId(@Nullable String databaseId) { - this.databaseId = databaseId; - return this; - } - - @VisibleForTesting - @Nullable - String getProjectId() { - return this.projectId; - } - - @VisibleForTesting - @Nullable - String getDatabaseId() { - return this.databaseId; + RpcQosOptions rpcQosOptions, + @Nullable String projectId, + @Nullable String databaseId) { + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); } public BatchWriteWithDeadLetterQueue.Builder withDeadLetterQueue() { return new BatchWriteWithDeadLetterQueue.Builder( - clock, firestoreStatefulComponentFactory, rpcQosOptions) - .setProjectId(projectId) - .setDatabaseId(databaseId); + clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); } @Override @@ -1530,18 +1602,13 @@ public static final class BatchWriteWithDeadLetterQueue BatchWriteWithDeadLetterQueue, BatchWriteWithDeadLetterQueue.Builder> { - private final @Nullable String projectId; - private final @Nullable String databaseId; - private BatchWriteWithDeadLetterQueue( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, @Nullable String projectId, @Nullable String databaseId) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions); - this.projectId = projectId; - this.databaseId = databaseId; + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); } @Override @@ -1560,7 +1627,8 @@ public PCollection expand(PCollection { - private @Nullable String projectId; - private @Nullable String databaseId; - private Builder() { super(); } - private Builder setProjectId(@Nullable String projectId) { - this.projectId = projectId; - return this; - } - - private Builder setDatabaseId(@Nullable String databaseId) { - this.databaseId = databaseId; - return this; - } - - @VisibleForTesting - @Nullable - String getProjectId() { - return this.projectId; - } - - @VisibleForTesting - @Nullable - String getDatabaseId() { - return this.databaseId; - } - private Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, - RpcQosOptions rpcQosOptions) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions); + RpcQosOptions rpcQosOptions, + @Nullable String projectId, + @Nullable String databaseId) { + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); } @Override @@ -1790,14 +1835,20 @@ private abstract static class Transform< final JodaClock clock; final FirestoreStatefulComponentFactory firestoreStatefulComponentFactory; final RpcQosOptions rpcQosOptions; + final @Nullable String projectId; + final @Nullable String databaseId; Transform( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, - RpcQosOptions rpcQosOptions) { + RpcQosOptions rpcQosOptions, + @Nullable String projectId, + @Nullable String databaseId) { this.clock = clock; this.firestoreStatefulComponentFactory = firestoreStatefulComponentFactory; this.rpcQosOptions = rpcQosOptions; + this.projectId = projectId; + this.databaseId = databaseId; } @Override @@ -1838,20 +1889,28 @@ abstract static class Builder< JodaClock clock; FirestoreStatefulComponentFactory firestoreStatefulComponentFactory; RpcQosOptions rpcQosOptions; + @Nullable String projectId; + @Nullable String databaseId; Builder() { clock = JodaClock.DEFAULT; firestoreStatefulComponentFactory = FirestoreStatefulComponentFactory.INSTANCE; rpcQosOptions = RpcQosOptions.defaultOptions(); + projectId = null; + databaseId = null; } private Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, - RpcQosOptions rpcQosOptions) { + RpcQosOptions rpcQosOptions, + @Nullable String projectId, + @Nullable String databaseId) { this.clock = clock; this.firestoreStatefulComponentFactory = firestoreStatefulComponentFactory; this.rpcQosOptions = rpcQosOptions; + this.projectId = projectId; + this.databaseId = databaseId; } /** @@ -1934,6 +1993,28 @@ public final BldrT withRpcQosOptions(RpcQosOptions rpcQosOptions) { this.rpcQosOptions = rpcQosOptions; return self(); } + + public final BldrT setProjectId(@Nullable String projectId) { + this.projectId = projectId; + return self(); + } + + public final BldrT setDatabaseId(@Nullable String databaseId) { + this.databaseId = databaseId; + return self(); + } + + @VisibleForTesting + @Nullable + String getProjectId() { + return this.projectId; + } + + @VisibleForTesting + @Nullable + String getDatabaseId() { + return this.databaseId; + } } } @@ -1950,8 +2031,10 @@ private abstract static class ReadTransform< JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); this.readTime = readTime; } @@ -1975,8 +2058,10 @@ private Builder( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, projectId, databaseId); this.readTime = readTime; } @@ -1986,7 +2071,9 @@ final TrfmT genericBuild() { requireNonNull(clock, "clock must be non null"), requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory must be non null"), requireNonNull(rpcQosOptions, "rpcQosOptions must be non null"), - readTime); + readTime, + projectId, + databaseId); } @Override @@ -2001,7 +2088,9 @@ abstract TrfmT buildSafe( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId); public final BldrT withReadTime(@Nullable Instant readTime) { this.readTime = readTime; From 133d5a612f7ff45c783bdf6a801efc62ee44b7d5 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Mon, 24 Nov 2025 21:14:00 +0000 Subject: [PATCH 03/17] WIP adding database ID to read fn --- .../FirestoreStatefulComponentFactory.java | 30 +++++++++++-- .../io/gcp/firestore/FirestoreV1ReadFn.java | 44 ++++++++++++++++++- .../io/gcp/firestore/FirestoreV1WriteFn.java | 4 +- 3 files changed, 71 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 4e8c11f7072c..923f0819feeb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.security.SecureRandom; import java.util.Map; +import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -65,6 +66,26 @@ private FirestoreStatefulComponentFactory() {} * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ FirestoreStub getFirestoreStub(PipelineOptions options) { + return getFirestoreStub(options, null, null); + } + + /** + * Given a {@link PipelineOptions}, return a pre-configured {@link FirestoreStub} with values set + * based on those options. + * + *

The provided {@link PipelineOptions} is expected to provide {@link FirestoreOptions} and + * {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions GcpOptions} for access to {@link + * GcpOptions#getProject()} + * + *

The instance returned by this method is expected to bind to the lifecycle of a bundle. + * + * @param options The instance of options to read from + * @param projectId The Project ID to target. + * @param databaseId The Database ID to target. + * @return a new {@link FirestoreStub} pre-configured with values from the provided options + */ + FirestoreStub getFirestoreStub( + PipelineOptions options, @Nullable String projectId, @Nullable String databaseId) { try { FirestoreSettings.Builder builder = FirestoreSettings.newBuilder(); @@ -96,10 +117,11 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { .setEndpoint(firestoreOptions.getFirestoreHost()); headers.put( "x-goog-request-params", - "project_id=" - + gcpOptions.getProject() - + "&database_id=" - + firestoreOptions.getFirestoreDb()); + "project_id=" + projectId != null + ? projectId + : gcpOptions.getProject() + "&database_id=" + databaseId != null + ? databaseId + : firestoreOptions.getFirestoreDb()); } builder.setHeaderProvider( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java index 51e5efa380e8..e23090e68727 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java @@ -35,6 +35,7 @@ import com.google.firestore.v1.BatchGetDocumentsResponse; import com.google.firestore.v1.BatchGetDocumentsResponse.ResultCase; import com.google.firestore.v1.Cursor; +import com.google.firestore.v1.DatabaseRootName; import com.google.firestore.v1.ListCollectionIdsRequest; import com.google.firestore.v1.ListCollectionIdsResponse; import com.google.firestore.v1.ListDocumentsRequest; @@ -139,6 +140,12 @@ protected RunQueryRequest setReadTime(RunQueryRequest element, Instant readTime) return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } + @Override + protected RunQueryRequest setDatabase( + RunQueryRequest element, String projectId, String databaseId) { + return element.toBuilder().setParent(DocumentRootName.format(projectId, databaseId)).build(); + } + @Override protected @Nullable RunQueryResponse resumptionValue( @Nullable RunQueryResponse previousValue, RunQueryResponse nextValue) { @@ -199,6 +206,10 @@ public void processElement(ProcessContext context) throws Exception { try { PartitionQueryRequest request = setPageToken(element, aggregate); request = readTime == null ? request : setReadTime(request, readTime); + request = + projectId == null || databaseId == null + ? request + : setDatabase(request, projectId, databaseId); attempt.recordRequestStart(clock.instant()); PartitionQueryPagedResponse pagedResponse = firestoreStub.partitionQueryPagedCallable().call(request); @@ -244,6 +255,12 @@ private PartitionQueryRequest setPageToken( protected PartitionQueryRequest setReadTime(PartitionQueryRequest element, Instant readTime) { return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } + + @Override + protected PartitionQueryRequest setDatabase( + PartitionQueryRequest element, String projectId, String databaseId) { + return element.toBuilder().setParent().build(); + } } /** @@ -609,7 +626,9 @@ abstract static class BaseFirestoreV1ReadFn // transient running state information, not important to any possible checkpointing protected transient FirestoreStub firestoreStub; protected transient RpcQos rpcQos; + private transient DatabaseRootName databaseRootName; protected transient String projectId; + protected transient @Nullable String databaseId; @SuppressWarnings( "initialization.fields.uninitialized") // allow transient fields to be managed by component @@ -618,12 +637,16 @@ protected BaseFirestoreV1ReadFn( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { this.clock = requireNonNull(clock, "clock must be non null"); this.firestoreStatefulComponentFactory = requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory must be non null"); this.rpcQosOptions = requireNonNull(rpcQosOptions, "rpcQosOptions must be non null"); this.readTime = readTime; + this.projectId = projectId; + this.databaseId = databaseId; } /** {@inheritDoc} */ @@ -635,7 +658,10 @@ public void setup() { /** {@inheritDoc} */ @Override public final void startBundle(StartBundleContext c) { - String project = c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreProject(); + String project = + this.projectId != null + ? this.projectId + : c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreProject(); if (project == null) { project = c.getPipelineOptions().as(GcpOptions.class).getProject(); } @@ -643,6 +669,18 @@ public final void startBundle(StartBundleContext c) { requireNonNull( project, "project must be defined on FirestoreOptions or GcpOptions of PipelineOptions"); + databaseId = + this.databaseId != null + ? this.databaseId + : c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreDb(); + databaseRootName = + DatabaseRootName.of( + requireNonNull( + project, + "project must be defined on FirestoreOptions or GcpOptions of PipelineOptions"), + requireNonNull( + databaseId, + "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); } @@ -662,6 +700,8 @@ public final void populateDisplayData(DisplayData.Builder builder) { } protected abstract InT setReadTime(InT element, Instant readTime); + + protected abstract InT setDatabase(InT element, String projectId, String databaseId); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java index 6bbb00e76f2d..ab33d8e5c166 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java @@ -244,7 +244,9 @@ public final void startBundle(StartBundleContext c) { requireNonNull( databaseId, "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); - firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); + firestoreStub = + firestoreStatefulComponentFactory.getFirestoreStub( + c.getPipelineOptions(), project, databaseId); } /** From f95d98892882a1d14746977dde8d734fb8890190 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 00:22:24 +0000 Subject: [PATCH 04/17] Revert "WIP adding database ID to read fn" This reverts commit 133d5a612f7ff45c783bdf6a801efc62ee44b7d5. --- .../FirestoreStatefulComponentFactory.java | 30 ++----------- .../io/gcp/firestore/FirestoreV1ReadFn.java | 44 +------------------ .../io/gcp/firestore/FirestoreV1WriteFn.java | 4 +- 3 files changed, 7 insertions(+), 71 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 923f0819feeb..4e8c11f7072c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -29,7 +29,6 @@ import java.io.Serializable; import java.security.SecureRandom; import java.util.Map; -import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -66,26 +65,6 @@ private FirestoreStatefulComponentFactory() {} * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ FirestoreStub getFirestoreStub(PipelineOptions options) { - return getFirestoreStub(options, null, null); - } - - /** - * Given a {@link PipelineOptions}, return a pre-configured {@link FirestoreStub} with values set - * based on those options. - * - *

The provided {@link PipelineOptions} is expected to provide {@link FirestoreOptions} and - * {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions GcpOptions} for access to {@link - * GcpOptions#getProject()} - * - *

The instance returned by this method is expected to bind to the lifecycle of a bundle. - * - * @param options The instance of options to read from - * @param projectId The Project ID to target. - * @param databaseId The Database ID to target. - * @return a new {@link FirestoreStub} pre-configured with values from the provided options - */ - FirestoreStub getFirestoreStub( - PipelineOptions options, @Nullable String projectId, @Nullable String databaseId) { try { FirestoreSettings.Builder builder = FirestoreSettings.newBuilder(); @@ -117,11 +96,10 @@ FirestoreStub getFirestoreStub( .setEndpoint(firestoreOptions.getFirestoreHost()); headers.put( "x-goog-request-params", - "project_id=" + projectId != null - ? projectId - : gcpOptions.getProject() + "&database_id=" + databaseId != null - ? databaseId - : firestoreOptions.getFirestoreDb()); + "project_id=" + + gcpOptions.getProject() + + "&database_id=" + + firestoreOptions.getFirestoreDb()); } builder.setHeaderProvider( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java index e23090e68727..51e5efa380e8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java @@ -35,7 +35,6 @@ import com.google.firestore.v1.BatchGetDocumentsResponse; import com.google.firestore.v1.BatchGetDocumentsResponse.ResultCase; import com.google.firestore.v1.Cursor; -import com.google.firestore.v1.DatabaseRootName; import com.google.firestore.v1.ListCollectionIdsRequest; import com.google.firestore.v1.ListCollectionIdsResponse; import com.google.firestore.v1.ListDocumentsRequest; @@ -140,12 +139,6 @@ protected RunQueryRequest setReadTime(RunQueryRequest element, Instant readTime) return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } - @Override - protected RunQueryRequest setDatabase( - RunQueryRequest element, String projectId, String databaseId) { - return element.toBuilder().setParent(DocumentRootName.format(projectId, databaseId)).build(); - } - @Override protected @Nullable RunQueryResponse resumptionValue( @Nullable RunQueryResponse previousValue, RunQueryResponse nextValue) { @@ -206,10 +199,6 @@ public void processElement(ProcessContext context) throws Exception { try { PartitionQueryRequest request = setPageToken(element, aggregate); request = readTime == null ? request : setReadTime(request, readTime); - request = - projectId == null || databaseId == null - ? request - : setDatabase(request, projectId, databaseId); attempt.recordRequestStart(clock.instant()); PartitionQueryPagedResponse pagedResponse = firestoreStub.partitionQueryPagedCallable().call(request); @@ -255,12 +244,6 @@ private PartitionQueryRequest setPageToken( protected PartitionQueryRequest setReadTime(PartitionQueryRequest element, Instant readTime) { return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } - - @Override - protected PartitionQueryRequest setDatabase( - PartitionQueryRequest element, String projectId, String databaseId) { - return element.toBuilder().setParent().build(); - } } /** @@ -626,9 +609,7 @@ abstract static class BaseFirestoreV1ReadFn // transient running state information, not important to any possible checkpointing protected transient FirestoreStub firestoreStub; protected transient RpcQos rpcQos; - private transient DatabaseRootName databaseRootName; protected transient String projectId; - protected transient @Nullable String databaseId; @SuppressWarnings( "initialization.fields.uninitialized") // allow transient fields to be managed by component @@ -637,16 +618,12 @@ protected BaseFirestoreV1ReadFn( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime, - @Nullable String projectId, - @Nullable String databaseId) { + @Nullable Instant readTime) { this.clock = requireNonNull(clock, "clock must be non null"); this.firestoreStatefulComponentFactory = requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory must be non null"); this.rpcQosOptions = requireNonNull(rpcQosOptions, "rpcQosOptions must be non null"); this.readTime = readTime; - this.projectId = projectId; - this.databaseId = databaseId; } /** {@inheritDoc} */ @@ -658,10 +635,7 @@ public void setup() { /** {@inheritDoc} */ @Override public final void startBundle(StartBundleContext c) { - String project = - this.projectId != null - ? this.projectId - : c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreProject(); + String project = c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreProject(); if (project == null) { project = c.getPipelineOptions().as(GcpOptions.class).getProject(); } @@ -669,18 +643,6 @@ public final void startBundle(StartBundleContext c) { requireNonNull( project, "project must be defined on FirestoreOptions or GcpOptions of PipelineOptions"); - databaseId = - this.databaseId != null - ? this.databaseId - : c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreDb(); - databaseRootName = - DatabaseRootName.of( - requireNonNull( - project, - "project must be defined on FirestoreOptions or GcpOptions of PipelineOptions"), - requireNonNull( - databaseId, - "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); } @@ -700,8 +662,6 @@ public final void populateDisplayData(DisplayData.Builder builder) { } protected abstract InT setReadTime(InT element, Instant readTime); - - protected abstract InT setDatabase(InT element, String projectId, String databaseId); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java index ab33d8e5c166..6bbb00e76f2d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java @@ -244,9 +244,7 @@ public final void startBundle(StartBundleContext c) { requireNonNull( databaseId, "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); - firestoreStub = - firestoreStatefulComponentFactory.getFirestoreStub( - c.getPipelineOptions(), project, databaseId); + firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); } /** From 781e90381c6155a68cb7ce3f755986f32ad49a04 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 00:26:58 +0000 Subject: [PATCH 05/17] Reapply "WIP adding database ID to read fn" This reverts commit f95d98892882a1d14746977dde8d734fb8890190. --- .../FirestoreStatefulComponentFactory.java | 30 +++++++++++-- .../io/gcp/firestore/FirestoreV1ReadFn.java | 44 ++++++++++++++++++- .../io/gcp/firestore/FirestoreV1WriteFn.java | 4 +- 3 files changed, 71 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 4e8c11f7072c..923f0819feeb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.security.SecureRandom; import java.util.Map; +import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -65,6 +66,26 @@ private FirestoreStatefulComponentFactory() {} * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ FirestoreStub getFirestoreStub(PipelineOptions options) { + return getFirestoreStub(options, null, null); + } + + /** + * Given a {@link PipelineOptions}, return a pre-configured {@link FirestoreStub} with values set + * based on those options. + * + *

The provided {@link PipelineOptions} is expected to provide {@link FirestoreOptions} and + * {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions GcpOptions} for access to {@link + * GcpOptions#getProject()} + * + *

The instance returned by this method is expected to bind to the lifecycle of a bundle. + * + * @param options The instance of options to read from + * @param projectId The Project ID to target. + * @param databaseId The Database ID to target. + * @return a new {@link FirestoreStub} pre-configured with values from the provided options + */ + FirestoreStub getFirestoreStub( + PipelineOptions options, @Nullable String projectId, @Nullable String databaseId) { try { FirestoreSettings.Builder builder = FirestoreSettings.newBuilder(); @@ -96,10 +117,11 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { .setEndpoint(firestoreOptions.getFirestoreHost()); headers.put( "x-goog-request-params", - "project_id=" - + gcpOptions.getProject() - + "&database_id=" - + firestoreOptions.getFirestoreDb()); + "project_id=" + projectId != null + ? projectId + : gcpOptions.getProject() + "&database_id=" + databaseId != null + ? databaseId + : firestoreOptions.getFirestoreDb()); } builder.setHeaderProvider( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java index 51e5efa380e8..e23090e68727 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java @@ -35,6 +35,7 @@ import com.google.firestore.v1.BatchGetDocumentsResponse; import com.google.firestore.v1.BatchGetDocumentsResponse.ResultCase; import com.google.firestore.v1.Cursor; +import com.google.firestore.v1.DatabaseRootName; import com.google.firestore.v1.ListCollectionIdsRequest; import com.google.firestore.v1.ListCollectionIdsResponse; import com.google.firestore.v1.ListDocumentsRequest; @@ -139,6 +140,12 @@ protected RunQueryRequest setReadTime(RunQueryRequest element, Instant readTime) return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } + @Override + protected RunQueryRequest setDatabase( + RunQueryRequest element, String projectId, String databaseId) { + return element.toBuilder().setParent(DocumentRootName.format(projectId, databaseId)).build(); + } + @Override protected @Nullable RunQueryResponse resumptionValue( @Nullable RunQueryResponse previousValue, RunQueryResponse nextValue) { @@ -199,6 +206,10 @@ public void processElement(ProcessContext context) throws Exception { try { PartitionQueryRequest request = setPageToken(element, aggregate); request = readTime == null ? request : setReadTime(request, readTime); + request = + projectId == null || databaseId == null + ? request + : setDatabase(request, projectId, databaseId); attempt.recordRequestStart(clock.instant()); PartitionQueryPagedResponse pagedResponse = firestoreStub.partitionQueryPagedCallable().call(request); @@ -244,6 +255,12 @@ private PartitionQueryRequest setPageToken( protected PartitionQueryRequest setReadTime(PartitionQueryRequest element, Instant readTime) { return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } + + @Override + protected PartitionQueryRequest setDatabase( + PartitionQueryRequest element, String projectId, String databaseId) { + return element.toBuilder().setParent().build(); + } } /** @@ -609,7 +626,9 @@ abstract static class BaseFirestoreV1ReadFn // transient running state information, not important to any possible checkpointing protected transient FirestoreStub firestoreStub; protected transient RpcQos rpcQos; + private transient DatabaseRootName databaseRootName; protected transient String projectId; + protected transient @Nullable String databaseId; @SuppressWarnings( "initialization.fields.uninitialized") // allow transient fields to be managed by component @@ -618,12 +637,16 @@ protected BaseFirestoreV1ReadFn( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { this.clock = requireNonNull(clock, "clock must be non null"); this.firestoreStatefulComponentFactory = requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory must be non null"); this.rpcQosOptions = requireNonNull(rpcQosOptions, "rpcQosOptions must be non null"); this.readTime = readTime; + this.projectId = projectId; + this.databaseId = databaseId; } /** {@inheritDoc} */ @@ -635,7 +658,10 @@ public void setup() { /** {@inheritDoc} */ @Override public final void startBundle(StartBundleContext c) { - String project = c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreProject(); + String project = + this.projectId != null + ? this.projectId + : c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreProject(); if (project == null) { project = c.getPipelineOptions().as(GcpOptions.class).getProject(); } @@ -643,6 +669,18 @@ public final void startBundle(StartBundleContext c) { requireNonNull( project, "project must be defined on FirestoreOptions or GcpOptions of PipelineOptions"); + databaseId = + this.databaseId != null + ? this.databaseId + : c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreDb(); + databaseRootName = + DatabaseRootName.of( + requireNonNull( + project, + "project must be defined on FirestoreOptions or GcpOptions of PipelineOptions"), + requireNonNull( + databaseId, + "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); } @@ -662,6 +700,8 @@ public final void populateDisplayData(DisplayData.Builder builder) { } protected abstract InT setReadTime(InT element, Instant readTime); + + protected abstract InT setDatabase(InT element, String projectId, String databaseId); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java index 6bbb00e76f2d..ab33d8e5c166 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java @@ -244,7 +244,9 @@ public final void startBundle(StartBundleContext c) { requireNonNull( databaseId, "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); - firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); + firestoreStub = + firestoreStatefulComponentFactory.getFirestoreStub( + c.getPipelineOptions(), project, databaseId); } /** From 0b5af7a7804a89f0e6a16e75b1e647615e8c8907 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 00:55:33 +0000 Subject: [PATCH 06/17] Update read fn constructors to take project and database --- .../io/gcp/firestore/FirestoreV1ReadFn.java | 123 ++++++++++++------ .../io/gcp/firestore/BaseFirestoreFnTest.java | 1 + 2 files changed, 85 insertions(+), 39 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java index e23090e68727..fb97931ca2be 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java @@ -35,7 +35,6 @@ import com.google.firestore.v1.BatchGetDocumentsResponse; import com.google.firestore.v1.BatchGetDocumentsResponse.ResultCase; import com.google.firestore.v1.Cursor; -import com.google.firestore.v1.DatabaseRootName; import com.google.firestore.v1.ListCollectionIdsRequest; import com.google.firestore.v1.ListCollectionIdsResponse; import com.google.firestore.v1.ListDocumentsRequest; @@ -101,6 +100,17 @@ static final class RunQueryFn super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); } + RunQueryFn( + JodaClock clock, + FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, + RpcQosOptions rpcQosOptions, + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); + } + @Override public Context getRpcAttemptContext() { return FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.RunQuery; @@ -140,12 +150,6 @@ protected RunQueryRequest setReadTime(RunQueryRequest element, Instant readTime) return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } - @Override - protected RunQueryRequest setDatabase( - RunQueryRequest element, String projectId, String databaseId) { - return element.toBuilder().setParent(DocumentRootName.format(projectId, databaseId)).build(); - } - @Override protected @Nullable RunQueryResponse resumptionValue( @Nullable RunQueryResponse previousValue, RunQueryResponse nextValue) { @@ -174,7 +178,7 @@ public PartitionQueryFn( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null, null, null); } public PartitionQueryFn( @@ -182,7 +186,18 @@ public PartitionQueryFn( FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, null, null); + } + + public PartitionQueryFn( + JodaClock clock, + FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, + RpcQosOptions rpcQosOptions, + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, databaseId, projectId); } @Override @@ -206,10 +221,6 @@ public void processElement(ProcessContext context) throws Exception { try { PartitionQueryRequest request = setPageToken(element, aggregate); request = readTime == null ? request : setReadTime(request, readTime); - request = - projectId == null || databaseId == null - ? request - : setDatabase(request, projectId, databaseId); attempt.recordRequestStart(clock.instant()); PartitionQueryPagedResponse pagedResponse = firestoreStub.partitionQueryPagedCallable().call(request); @@ -255,12 +266,6 @@ private PartitionQueryRequest setPageToken( protected PartitionQueryRequest setReadTime(PartitionQueryRequest element, Instant readTime) { return element.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } - - @Override - protected PartitionQueryRequest setDatabase( - PartitionQueryRequest element, String projectId, String databaseId) { - return element.toBuilder().setParent().build(); - } } /** @@ -283,7 +288,7 @@ static final class ListDocumentsFn JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null, null, null); } ListDocumentsFn( @@ -291,7 +296,18 @@ static final class ListDocumentsFn FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, null, null); + } + + ListDocumentsFn( + JodaClock clock, + FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, + RpcQosOptions rpcQosOptions, + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } @Override @@ -337,7 +353,7 @@ static final class ListCollectionIdsFn JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null, null, null); } ListCollectionIdsFn( @@ -345,7 +361,18 @@ static final class ListCollectionIdsFn FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, null, null); + } + + ListCollectionIdsFn( + JodaClock clock, + FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, + RpcQosOptions rpcQosOptions, + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } @Override @@ -400,6 +427,17 @@ static final class BatchGetDocumentsFn super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); } + BatchGetDocumentsFn( + JodaClock clock, + FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, + RpcQosOptions rpcQosOptions, + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); + } + @Override public Context getRpcAttemptContext() { return FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchGetDocuments; @@ -475,7 +513,7 @@ protected StreamingFirestoreV1ReadFn( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, null, null, null); } protected StreamingFirestoreV1ReadFn( @@ -483,7 +521,18 @@ protected StreamingFirestoreV1ReadFn( FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, null, null); + } + + protected StreamingFirestoreV1ReadFn( + JodaClock clock, + FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, + RpcQosOptions rpcQosOptions, + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } protected abstract ServerStreamingCallable getCallable(FirestoreStub firestoreStub); @@ -556,8 +605,11 @@ protected PaginatedFirestoreV1ReadFn( JodaClock clock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, - @Nullable Instant readTime) { - super(clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime); + @Nullable Instant readTime, + @Nullable String projectId, + @Nullable String databaseId) { + super( + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } protected abstract UnaryCallable getCallable( @@ -626,7 +678,6 @@ abstract static class BaseFirestoreV1ReadFn // transient running state information, not important to any possible checkpointing protected transient FirestoreStub firestoreStub; protected transient RpcQos rpcQos; - private transient DatabaseRootName databaseRootName; protected transient String projectId; protected transient @Nullable String databaseId; @@ -673,15 +724,11 @@ public final void startBundle(StartBundleContext c) { this.databaseId != null ? this.databaseId : c.getPipelineOptions().as(FirestoreOptions.class).getFirestoreDb(); - databaseRootName = - DatabaseRootName.of( - requireNonNull( - project, - "project must be defined on FirestoreOptions or GcpOptions of PipelineOptions"), - requireNonNull( - databaseId, - "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); - firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); + requireNonNull( + databaseId, "firestoreDb must be defined on FirestoreOptions of PipelineOptions"); + firestoreStub = + firestoreStatefulComponentFactory.getFirestoreStub( + c.getPipelineOptions(), projectId, databaseId); } /** {@inheritDoc} */ @@ -700,8 +747,6 @@ public final void populateDisplayData(DisplayData.Builder builder) { } protected abstract InT setReadTime(InT element, Instant readTime); - - protected abstract InT setDatabase(InT element, String projectId, String databaseId); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java index 4adce82dab2d..b7aeab2956ec 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java @@ -39,6 +39,7 @@ abstract class BaseFirestoreFnTest> { protected final String projectId = "testing-project"; + protected final String databaseId = "firestoredb"; // @Rule public final Timeout timeout = new Timeout(10, TimeUnit.SECONDS); @Mock protected DoFn.StartBundleContext startBundleContext; From 86bdfec4439ce960c791d68f1097146aeb61c3b8 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 01:24:29 +0000 Subject: [PATCH 07/17] fix build errors --- .../FirestoreStatefulComponentFactory.java | 24 ++++++++++++------- .../io/gcp/firestore/FirestoreV1ReadFn.java | 6 +++-- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 923f0819feeb..3fca5a085b9b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -80,12 +80,14 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { *

The instance returned by this method is expected to bind to the lifecycle of a bundle. * * @param options The instance of options to read from - * @param projectId The Project ID to target. - * @param databaseId The Database ID to target. + * @param configuredProjectId The Project ID to target. + * @param configuredDatabaseId The Database ID to target. * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ FirestoreStub getFirestoreStub( - PipelineOptions options, @Nullable String projectId, @Nullable String databaseId) { + PipelineOptions options, + @Nullable String configuredProjectId, + @Nullable String configuredDatabaseId) { try { FirestoreSettings.Builder builder = FirestoreSettings.newBuilder(); @@ -115,13 +117,17 @@ FirestoreStub getFirestoreStub( builder .setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential())) .setEndpoint(firestoreOptions.getFirestoreHost()); + String projectId = + configuredProjectId != null + ? configuredProjectId + : firestoreOptions.getFirestoreProject(); + if (projectId == null) { + projectId = gcpOptions.getProject(); + } + String databaseId = + configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb(); headers.put( - "x-goog-request-params", - "project_id=" + projectId != null - ? projectId - : gcpOptions.getProject() + "&database_id=" + databaseId != null - ? databaseId - : firestoreOptions.getFirestoreDb()); + "x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId); } builder.setHeaderProvider( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java index fb97931ca2be..02423480d97e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java @@ -197,7 +197,7 @@ public PartitionQueryFn( @Nullable String projectId, @Nullable String databaseId) { super( - clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, databaseId, projectId); + clock, firestoreStatefulComponentFactory, rpcQosOptions, readTime, projectId, databaseId); } @Override @@ -696,7 +696,9 @@ protected BaseFirestoreV1ReadFn( requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory must be non null"); this.rpcQosOptions = requireNonNull(rpcQosOptions, "rpcQosOptions must be non null"); this.readTime = readTime; - this.projectId = projectId; + if (projectId != null) { + this.projectId = projectId; + } this.databaseId = databaseId; } From 71b003ef0ad748814449093b26b8b4cd5797a058 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 21:53:22 +0000 Subject: [PATCH 08/17] Update integration test and BatchGet test --- .../sdk/io/gcp/firestore/FirestoreV1.java | 10 ++++ .../io/gcp/firestore/FirestoreV1ReadFn.java | 1 + .../io/gcp/firestore/BaseFirestoreFnTest.java | 1 - .../FirestoreV1FnBatchGetDocumentsTest.java | 40 ++++++++++--- .../io/gcp/firestore/it/BaseFirestoreIT.java | 57 +++++++++++++++++-- 5 files changed, 95 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java index 60fe3c136467..3f22e636e8ab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java @@ -2096,6 +2096,16 @@ public final BldrT withReadTime(@Nullable Instant readTime) { this.readTime = readTime; return self(); } + + public final BldrT withProjectId(@Nullable String projectId) { + this.projectId = projectId; + return self(); + } + + public final BldrT withDatabaseId(@Nullable String databaseId) { + this.databaseId = databaseId; + return self(); + } } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java index 02423480d97e..84e1cb1be0ac 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java @@ -738,6 +738,7 @@ public final void startBundle(StartBundleContext c) { @Override public void finishBundle() throws Exception { projectId = null; + databaseId = null; firestoreStub.close(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java index b7aeab2956ec..4adce82dab2d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreFnTest.java @@ -39,7 +39,6 @@ abstract class BaseFirestoreFnTest> { protected final String projectId = "testing-project"; - protected final String databaseId = "firestoredb"; // @Rule public final Timeout timeout = new Timeout(10, TimeUnit.SECONDS); @Mock protected DoFn.StartBundleContext startBundleContext; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java index b9c950e92fd5..52fa840bf0ad 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java @@ -54,7 +54,12 @@ @RunWith(Parameterized.class) public final class FirestoreV1FnBatchGetDocumentsTest extends BaseFirestoreV1ReadFnTest { - @Parameterized.Parameter public Instant readTime; + + @Parameterized.Parameter(0) + public Instant readTime; + + @Parameterized.Parameter(1) + public boolean setDatabaseOnReadFn; @Rule public MockitoRule rule = MockitoJUnit.rule(); @@ -65,9 +70,12 @@ public final class FirestoreV1FnBatchGetDocumentsTest @Mock private ServerStream responseStream2; @Mock private ServerStream responseStream3; - @Parameterized.Parameters(name = "readTime = {0}") - public static Collection data() { - return Arrays.asList(null, Instant.now()); + @Parameterized.Parameters(name = "readTime = {0}, setDatabaseOnReadFn = {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true} + }); } private BatchGetDocumentsRequest withReadTime( @@ -77,6 +85,14 @@ private BatchGetDocumentsRequest withReadTime( : request.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } + private void mockFirestoreStub() { + if (setDatabaseOnReadFn) { + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); + } else { + when(ff.getFirestoreStub(any())).thenReturn(stub); + } + } + @Test public void endToEnd() throws Exception { final BatchGetDocumentsRequest request = @@ -98,7 +114,7 @@ public void endToEnd() throws Exception { when(stub.batchGetDocumentsCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + mockFirestoreStub(); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(rpcQosOptions)); @@ -108,7 +124,7 @@ public void endToEnd() throws Exception { when(processContext.element()).thenReturn(request); - runFunction(new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime)); + runFunction(getFnWithParameters()); List allValues = responsesCaptor.getAllValues(); assertEquals(responses, allValues); @@ -184,7 +200,7 @@ protected BatchGetDocumentsResponse computeNext() { when(stub.batchGetDocumentsCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + mockFirestoreStub(); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); @@ -196,7 +212,7 @@ protected BatchGetDocumentsResponse computeNext() { when(processContext.element()).thenReturn(request1); - BatchGetDocumentsFn fn = new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime); + BatchGetDocumentsFn fn = getFnWithParameters(); runFunction(fn); @@ -246,6 +262,14 @@ protected BatchGetDocumentsFn getFn( return new BatchGetDocumentsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions); } + private BatchGetDocumentsFn getFnWithParameters() { + if (setDatabaseOnReadFn) { + return new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)"); + } else { + return new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime); + } + } + private static BatchGetDocumentsResponse newFound(int docNumber) { String docName = docName(docNumber); return BatchGetDocumentsResponse.newBuilder() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java index 8695080cb885..89809ec6e786 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java @@ -162,6 +162,8 @@ public final void listCollections() throws Exception { FirestoreIO.v1() .read() .listCollectionIds() + .withProjectId(project) + .withDatabaseId(databaseId) .withRpcQosOptions(RPC_QOS_OPTIONS) .build()); @@ -177,6 +179,8 @@ public final void listCollections() throws Exception { FirestoreIO.v1() .read() .listCollectionIds() + .withProjectId(project) + .withDatabaseId(databaseId) .withReadTime(readTime) .withRpcQosOptions(RPC_QOS_OPTIONS) .build()); @@ -208,7 +212,13 @@ public final void listDocuments() throws Exception { .apply(Create.of("a")) .apply(getListDocumentsPTransform(testName.getMethodName())) .apply( - FirestoreIO.v1().read().listDocuments().withRpcQosOptions(RPC_QOS_OPTIONS).build()) + FirestoreIO.v1() + .read() + .listDocuments() + .withProjectId(project) + .withDatabaseId(databaseId) + .withRpcQosOptions(RPC_QOS_OPTIONS) + .build()) .apply(ParDo.of(new DocumentToName())); PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths); @@ -223,6 +233,8 @@ public final void listDocuments() throws Exception { FirestoreIO.v1() .read() .listDocuments() + .withProjectId(project) + .withDatabaseId(databaseId) .withReadTime(readTime) .withRpcQosOptions(RPC_QOS_OPTIONS) .build()) @@ -259,7 +271,14 @@ public final void runQuery() throws Exception { testPipeline .apply(Create.of(collectionId)) .apply(getRunQueryPTransform(testName.getMethodName())) - .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(RPC_QOS_OPTIONS).build()) + .apply( + FirestoreIO.v1() + .read() + .runQuery() + .withProjectId(project) + .withDatabaseId(databaseId) + .withRpcQosOptions(RPC_QOS_OPTIONS) + .build()) .apply(ParDo.of(new RunQueryResponseToDocument())) .apply(ParDo.of(new DocumentToName())); @@ -275,6 +294,8 @@ public final void runQuery() throws Exception { FirestoreIO.v1() .read() .runQuery() + .withProjectId(project) + .withDatabaseId(databaseId) .withReadTime(readTime) .withRpcQosOptions(RPC_QOS_OPTIONS) .build()) @@ -317,8 +338,21 @@ public final void partitionQuery() throws Exception { testPipeline .apply(Create.of(collectionGroupId)) .apply(getPartitionQueryPTransform(testName.getMethodName(), partitionCount)) - .apply(FirestoreIO.v1().read().partitionQuery().withNameOnlyQuery().build()) - .apply(FirestoreIO.v1().read().runQuery().build()) + .apply( + FirestoreIO.v1() + .read() + .partitionQuery() + .withProjectId(project) + .withDatabaseId(databaseId) + .withNameOnlyQuery() + .build()) + .apply( + FirestoreIO.v1() + .read() + .runQuery() + .withProjectId(project) + .withDatabaseId(databaseId) + .build()) .apply(ParDo.of(new RunQueryResponseToDocument())) .apply(ParDo.of(new DocumentToName())); @@ -334,10 +368,19 @@ public final void partitionQuery() throws Exception { FirestoreIO.v1() .read() .partitionQuery() + .withProjectId(project) + .withDatabaseId(databaseId) .withReadTime(readTime) .withNameOnlyQuery() .build()) - .apply(FirestoreIO.v1().read().runQuery().withReadTime(readTime).build()) + .apply( + FirestoreIO.v1() + .read() + .runQuery() + .withProjectId(project) + .withDatabaseId(databaseId) + .withReadTime(readTime) + .build()) .apply(ParDo.of(new RunQueryResponseToDocument())) .apply(ParDo.of(new DocumentToName())); @@ -380,6 +423,8 @@ public final void batchGet() throws Exception { FirestoreIO.v1() .read() .batchGetDocuments() + .withProjectId(project) + .withDatabaseId(databaseId) .withRpcQosOptions(RPC_QOS_OPTIONS) .build()) .apply(Filter.by(BatchGetDocumentsResponse::hasFound)) @@ -398,6 +443,8 @@ public final void batchGet() throws Exception { FirestoreIO.v1() .read() .batchGetDocuments() + .withProjectId(project) + .withDatabaseId(databaseId) .withReadTime(readTime) .withRpcQosOptions(RPC_QOS_OPTIONS) .build()) From 22a59cde4ceb6856d8ffe4c5cdc1ef142818b8c0 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 23:26:07 +0000 Subject: [PATCH 09/17] Update remaining unit tests --- .../firestore/BaseFirestoreV1ReadFnTest.java | 4 +-- .../firestore/BaseFirestoreV1WriteFnTest.java | 6 ++-- .../FirestoreV1FnBatchGetDocumentsTest.java | 18 +++------- ...V1FnBatchWriteWithDeadLetterQueueTest.java | 2 +- ...irestoreV1FnBatchWriteWithSummaryTest.java | 2 +- .../FirestoreV1FnListCollectionIdsTest.java | 30 +++++++++++----- .../FirestoreV1FnListDocumentsTest.java | 30 ++++++++++++---- .../FirestoreV1FnPartitionQueryTest.java | 34 ++++++++++++++----- .../firestore/FirestoreV1FnRunQueryTest.java | 28 +++++++++++---- 9 files changed, 104 insertions(+), 50 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java index 0aab59d3aacd..5c28d3fc99ea 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java @@ -45,7 +45,7 @@ abstract class BaseFirestoreV1ReadFnTest public final void attemptsExhaustedForRetryableError() throws Exception { BaseFirestoreV1ReadFn fn = getFn(clock, ff, rpcQosOptions); V1RpcFnTestCtx ctx = newCtx(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt); ctx.mockRpcToCallable(stub); @@ -79,7 +79,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { public final void noRequestIsSentIfNotSafeToProceed() throws Exception { BaseFirestoreV1ReadFn fn = getFn(clock, ff, rpcQosOptions); V1RpcFnTestCtx ctx = newCtx(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java index 623f947c45a7..355322162005 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java @@ -129,7 +129,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { Write write = newWrite(); Element element1 = new WriteElement(0, write, window); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)) .thenReturn(attempt); @@ -175,7 +175,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { @Override @Test public final void noRequestIsSentIfNotSafeToProceed() throws Exception { - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)) .thenReturn(attempt); @@ -369,7 +369,7 @@ public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottlin LOG.debug("options = {}", options); FirestoreStatefulComponentFactory ff = mock(FirestoreStatefulComponentFactory.class); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); Random random = new Random(12345); TestClock clock = new TestClock(Instant.EPOCH, Duration.standardSeconds(1)); Sleeper sleeper = millis -> clock.setNext(advanceClockBy(Duration.millis(millis))); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java index 52fa840bf0ad..1dec02ad40a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java @@ -59,7 +59,7 @@ public final class FirestoreV1FnBatchGetDocumentsTest public Instant readTime; @Parameterized.Parameter(1) - public boolean setDatabaseOnReadFn; + public boolean setDatabaseOnFn; @Rule public MockitoRule rule = MockitoJUnit.rule(); @@ -70,7 +70,7 @@ public final class FirestoreV1FnBatchGetDocumentsTest @Mock private ServerStream responseStream2; @Mock private ServerStream responseStream3; - @Parameterized.Parameters(name = "readTime = {0}, setDatabaseOnReadFn = {1}") + @Parameterized.Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}") public static Collection data() { return Arrays.asList( new Object[][] { @@ -85,14 +85,6 @@ private BatchGetDocumentsRequest withReadTime( : request.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build(); } - private void mockFirestoreStub() { - if (setDatabaseOnReadFn) { - when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); - } else { - when(ff.getFirestoreStub(any())).thenReturn(stub); - } - } - @Test public void endToEnd() throws Exception { final BatchGetDocumentsRequest request = @@ -114,7 +106,7 @@ public void endToEnd() throws Exception { when(stub.batchGetDocumentsCallable()).thenReturn(callable); - mockFirestoreStub(); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(rpcQosOptions)); @@ -200,7 +192,7 @@ protected BatchGetDocumentsResponse computeNext() { when(stub.batchGetDocumentsCallable()).thenReturn(callable); - mockFirestoreStub(); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); @@ -263,7 +255,7 @@ protected BatchGetDocumentsFn getFn( } private BatchGetDocumentsFn getFnWithParameters() { - if (setDatabaseOnReadFn) { + if (setDatabaseOnFn) { return new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)"); } else { return new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java index 35d0ea9482d3..e7f98ff73c6b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java @@ -67,7 +67,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception { int maxBytes = 50; RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java index 3e37e3975bf5..e7174537943e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java @@ -76,7 +76,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception { int maxBytes = 50; RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java index eb3cd2692c8e..e99d42427316 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java @@ -57,6 +57,9 @@ public final class FirestoreV1FnListCollectionIdsTest @Parameter public Instant readTime; + @Parameter(1) + public boolean setDatabaseOnFn; + @Rule public MockitoRule rule = MockitoJUnit.rule(); @Mock private UnaryCallable callable; @@ -65,9 +68,12 @@ public final class FirestoreV1FnListCollectionIdsTest @Mock private ListCollectionIdsPagedResponse pagedResponse2; @Mock private ListCollectionIdsPage page2; - @Parameters(name = "readTime = {0}") - public static Collection data() { - return Arrays.asList(null, Instant.now()); + @Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true} + }); } private ListCollectionIdsRequest withReadTime(ListCollectionIdsRequest input, Instant readTime) { @@ -104,7 +110,7 @@ public void endToEnd() throws Exception { when(stub.listCollectionIdsPagedCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); RpcQosOptions options = RpcQosOptions.defaultOptions(); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); @@ -116,7 +122,7 @@ public void endToEnd() throws Exception { when(processContext.element()).thenReturn(request1); - ListCollectionIdsFn fn = new ListCollectionIdsFn(clock, ff, options, readTime); + ListCollectionIdsFn fn = getFnWithParameters(); runFunction(fn); @@ -127,7 +133,7 @@ public void endToEnd() throws Exception { @Override public void resumeFromLastReadValue() throws Exception { - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); @@ -186,7 +192,7 @@ protected ListCollectionIdsPage computeNext() { when(stub.listCollectionIdsPagedCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); ArgumentCaptor responses = ArgumentCaptor.forClass(ListCollectionIdsResponse.class); @@ -195,7 +201,7 @@ protected ListCollectionIdsPage computeNext() { when(processContext.element()).thenReturn(request1); - ListCollectionIdsFn fn = new ListCollectionIdsFn(clock, ff, rpcQosOptions, readTime); + ListCollectionIdsFn fn = getFnWithParameters(); runFunction(fn); @@ -238,4 +244,12 @@ protected ListCollectionIdsFn getFn( RpcQosOptions rpcQosOptions) { return new ListCollectionIdsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions); } + + private ListCollectionIdsFn getFnWithParameters() { + if (setDatabaseOnFn) { + return new ListCollectionIdsFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)"); + } else { + return new ListCollectionIdsFn(clock, ff, rpcQosOptions, readTime); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java index 2faa7c3e2f1b..54827f6d6017 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java @@ -47,6 +47,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; @@ -58,6 +60,9 @@ public final class FirestoreV1FnListDocumentsTest @Parameterized.Parameter public Instant readTime; + @Parameter(1) + public boolean setDatabaseOnFn; + @Rule public MockitoRule rule = MockitoJUnit.rule(); @Mock private UnaryCallable callable; @@ -66,9 +71,12 @@ public final class FirestoreV1FnListDocumentsTest @Mock private ListDocumentsPagedResponse pagedResponse2; @Mock private ListDocumentsPage page2; - @Parameterized.Parameters(name = "readTime = {0}") - public static Collection data() { - return Arrays.asList(null, Instant.now()); + @Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true} + }); } private ListDocumentsRequest withReadTime(ListDocumentsRequest request, Instant readTime) { @@ -127,7 +135,7 @@ public void endToEnd() throws Exception { when(stub.listDocumentsPagedCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); RpcQosOptions options = RpcQosOptions.defaultOptions(); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); @@ -139,7 +147,7 @@ public void endToEnd() throws Exception { when(processContext.element()).thenReturn(request1); - ListDocumentsFn fn = new ListDocumentsFn(clock, ff, options, readTime); + ListDocumentsFn fn = getFnWithParameters(); runFunction(fn); @@ -150,7 +158,7 @@ public void endToEnd() throws Exception { @Override public void resumeFromLastReadValue() throws Exception { - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); @@ -231,7 +239,7 @@ protected ListDocumentsPage computeNext() { when(stub.listDocumentsPagedCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); ArgumentCaptor responses = ArgumentCaptor.forClass(ListDocumentsResponse.class); @@ -283,4 +291,12 @@ protected ListDocumentsFn getFn( RpcQosOptions rpcQosOptions) { return new ListDocumentsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions); } + + private ListDocumentsFn getFnWithParameters() { + if (setDatabaseOnFn) { + return new ListDocumentsFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)"); + } else { + return new ListDocumentsFn(clock, ff, rpcQosOptions, readTime); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java index d6c69fbd96b2..20f728bab73a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java @@ -47,6 +47,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; @@ -58,6 +60,9 @@ public final class FirestoreV1FnPartitionQueryTest @Parameterized.Parameter public Instant readTime; + @Parameter(1) + public boolean setDatabaseOnFn; + @Rule public MockitoRule rule = MockitoJUnit.rule(); @Mock private UnaryCallable callable; @@ -66,9 +71,12 @@ public final class FirestoreV1FnPartitionQueryTest @Mock private PartitionQueryPagedResponse pagedResponse2; @Mock private PartitionQueryPage page2; - @Parameterized.Parameters(name = "readTime = {0}") - public static Collection data() { - return Arrays.asList(null, Instant.now()); + @Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true} + }); } private PartitionQueryRequest withReadTime(PartitionQueryRequest request, Instant readTime) { @@ -101,7 +109,7 @@ public void endToEnd() throws Exception { when(stub.partitionQueryPagedCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); RpcQosOptions options = RpcQosOptions.defaultOptions(); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); @@ -113,7 +121,7 @@ public void endToEnd() throws Exception { when(processContext.element()).thenReturn(request1); - PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options, readTime); + PartitionQueryFn fn = getFnWithParameters(); runFunction(fn); @@ -136,7 +144,7 @@ public void endToEnd_emptyCursors() throws Exception { when(stub.partitionQueryPagedCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); RpcQosOptions options = RpcQosOptions.defaultOptions(); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); @@ -148,7 +156,7 @@ public void endToEnd_emptyCursors() throws Exception { when(processContext.element()).thenReturn(request1); - PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options, readTime); + PartitionQueryFn fn = getFnWithParameters(); runFunction(fn); @@ -159,7 +167,7 @@ public void endToEnd_emptyCursors() throws Exception { @Override public void resumeFromLastReadValue() throws Exception { - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); @@ -230,7 +238,7 @@ protected PartitionQueryPage computeNext() { when(stub.partitionQueryPagedCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); ArgumentCaptor responses = ArgumentCaptor.forClass(PartitionQueryPair.class); @@ -283,4 +291,12 @@ protected PartitionQueryFn getFn( RpcQosOptions rpcQosOptions) { return new PartitionQueryFn(clock, firestoreStatefulComponentFactory, rpcQosOptions); } + + private PartitionQueryFn getFnWithParameters() { + if (setDatabaseOnFn) { + return new PartitionQueryFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)"); + } else { + return new PartitionQueryFn(clock, ff, rpcQosOptions, readTime); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnRunQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnRunQueryTest.java index 02e5f9743eaa..78dad6faeaea 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnRunQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnRunQueryTest.java @@ -59,6 +59,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; @@ -70,15 +72,21 @@ public final class FirestoreV1FnRunQueryTest @Parameterized.Parameter public Instant readTime; + @Parameter(1) + public boolean setDatabaseOnFn; + @Rule public MockitoRule rule = MockitoJUnit.rule(); @Mock private ServerStreamingCallable callable; @Mock private ServerStream responseStream; @Mock private ServerStream retryResponseStream; - @Parameterized.Parameters(name = "readTime = {0}") - public static Collection data() { - return Arrays.asList(null, Instant.now()); + @Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true} + }); } private RunQueryRequest withReadTime(RunQueryRequest request, Instant readTime) { @@ -100,7 +108,7 @@ public void endToEnd() throws Exception { when(stub.runQueryCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); RpcQosOptions options = RpcQosOptions.defaultOptions(); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); @@ -112,7 +120,7 @@ public void endToEnd() throws Exception { when(processContext.element()).thenReturn(testData.request); - RunQueryFn fn = new RunQueryFn(clock, ff, options, readTime); + RunQueryFn fn = getFnWithParameters(); runFunction(fn); @@ -242,7 +250,7 @@ protected RunQueryResponse computeNext() { when(stub.runQueryCallable()).thenReturn(callable); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); @@ -302,6 +310,14 @@ protected RunQueryFn getFn( return new RunQueryFn(clock, firestoreStatefulComponentFactory, rpcQosOptions); } + private RunQueryFn getFnWithParameters() { + if (setDatabaseOnFn) { + return new RunQueryFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)"); + } else { + return new RunQueryFn(clock, ff, rpcQosOptions, readTime); + } + } + private static final class TestData { static final FieldReference FILTER_FIELD_PATH = From d5ba7c80f973a6dbd55618a87541837aad618630 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 23:37:31 +0000 Subject: [PATCH 10/17] Fix write unit test --- .../beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java index 355322162005..f20181fbc320 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java @@ -111,7 +111,7 @@ public final void setUp() { when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2); when(ff.getRpcQos(any())).thenReturn(rpcQos); - when(ff.getFirestoreStub(pipelineOptions)).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(stub.batchWriteCallable()).thenReturn(callable); metricsFixture = new MetricsFixture(); } From d52db6c0beaf2f57ef725f7a4722142d0f899560 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 00:22:30 +0000 Subject: [PATCH 11/17] Minor format changes --- .../FirestoreStatefulComponentFactory.java | 29 ++++--------------- 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 3fca5a085b9b..8febb7eb4f95 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -65,25 +65,6 @@ private FirestoreStatefulComponentFactory() {} * @param options The instance of options to read from * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ - FirestoreStub getFirestoreStub(PipelineOptions options) { - return getFirestoreStub(options, null, null); - } - - /** - * Given a {@link PipelineOptions}, return a pre-configured {@link FirestoreStub} with values set - * based on those options. - * - *

The provided {@link PipelineOptions} is expected to provide {@link FirestoreOptions} and - * {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions GcpOptions} for access to {@link - * GcpOptions#getProject()} - * - *

The instance returned by this method is expected to bind to the lifecycle of a bundle. - * - * @param options The instance of options to read from - * @param configuredProjectId The Project ID to target. - * @param configuredDatabaseId The Database ID to target. - * @return a new {@link FirestoreStub} pre-configured with values from the provided options - */ FirestoreStub getFirestoreStub( PipelineOptions options, @Nullable String configuredProjectId, @@ -118,16 +99,16 @@ FirestoreStub getFirestoreStub( .setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential())) .setEndpoint(firestoreOptions.getFirestoreHost()); String projectId = - configuredProjectId != null - ? configuredProjectId - : firestoreOptions.getFirestoreProject(); + configuredProjectId != null + ? configuredProjectId + : firestoreOptions.getFirestoreProject(); if (projectId == null) { projectId = gcpOptions.getProject(); } String databaseId = - configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb(); + configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb(); headers.put( - "x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId); + "x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId); } builder.setHeaderProvider( From b13da4d5a3de0c11a77bcfcbefaa02d78a539173 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 00:26:28 +0000 Subject: [PATCH 12/17] update Changes.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 222d4b82cb25..cd016c53243a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,7 +72,7 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). * Python examples added for Milvus search enrichment handler on [Beam Website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-milvus/) including jupyter notebook example (Python) ([#36176](https://github.com/apache/beam/issues/36176)). From aa9f70d5bf51c83929a42d1de3c8790da7d60cda Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 00:31:33 +0000 Subject: [PATCH 13/17] Update javadoc comment for getFirestoreStub --- .../sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 8febb7eb4f95..81cdd0d3340b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -63,6 +63,8 @@ private FirestoreStatefulComponentFactory() {} *

The instance returned by this method is expected to bind to the lifecycle of a bundle. * * @param options The instance of options to read from + * @param configuredProjectId The project to target, if null, falls back to value in options. + * @param configuredDatabaseId The database to target, if null, falls back to value in options. * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ FirestoreStub getFirestoreStub( From 9a67563571119cc26c5a7f03336983820b8a46b3 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 01:00:03 +0000 Subject: [PATCH 14/17] Run spotlessApply --- .../firestore/FirestoreStatefulComponentFactory.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 81cdd0d3340b..fd124cb9236f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -101,16 +101,16 @@ FirestoreStub getFirestoreStub( .setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential())) .setEndpoint(firestoreOptions.getFirestoreHost()); String projectId = - configuredProjectId != null - ? configuredProjectId - : firestoreOptions.getFirestoreProject(); + configuredProjectId != null + ? configuredProjectId + : firestoreOptions.getFirestoreProject(); if (projectId == null) { projectId = gcpOptions.getProject(); } String databaseId = - configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb(); + configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb(); headers.put( - "x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId); + "x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId); } builder.setHeaderProvider( From 9a398e98f47ce650871aa603003574d4bcafe423 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 17:44:54 +0000 Subject: [PATCH 15/17] fix changes.md --- CHANGES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index a7ed596b73f4..aa89d7ae04dd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,9 +72,11 @@ ## New Features / Improvements -* Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). * Python examples added for Milvus search enrichment handler on [Beam Website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-milvus/) including jupyter notebook example (Python) ([#36176](https://github.com/apache/beam/issues/36176)). +* Milvus sink I/O connector added (Python) ([#36702](https://github.com/apache/beam/issues/36702)). + Now Beam has full support for Milvus integration including Milvus enrichment and sink operations. +* Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). ## Breaking Changes From ed4001d067d0aefa23f6c500c8cef9c14d1a5465 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 17:47:45 +0000 Subject: [PATCH 16/17] Fix changes.md conflict --- CHANGES.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index aa89d7ae04dd..c301c2c9308b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,9 +74,6 @@ * Python examples added for Milvus search enrichment handler on [Beam Website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-milvus/) including jupyter notebook example (Python) ([#36176](https://github.com/apache/beam/issues/36176)). -* Milvus sink I/O connector added (Python) ([#36702](https://github.com/apache/beam/issues/36702)). - Now Beam has full support for Milvus integration including Milvus enrichment and sink operations. -* Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). ## Breaking Changes From 766959941d12b0514cc884c19088bdf4836e75f9 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 17:51:56 +0000 Subject: [PATCH 17/17] Take 3 fix changes.md --- CHANGES.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c301c2c9308b..252d34aa9af6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,8 +72,7 @@ ## New Features / Improvements -* Python examples added for Milvus search enrichment handler on [Beam Website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-milvus/) - including jupyter notebook example (Python) ([#36176](https://github.com/apache/beam/issues/36176)). +* Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). ## Breaking Changes