diff --git a/CHANGES.md b/CHANGES.md index 222d4b82cb25..d7c1e05dfa3e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -87,7 +87,7 @@ ## Bugfixes -* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)). ## Known Issues diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java index 5adc9ef38f36..8b90594bb655 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java @@ -48,10 +48,7 @@ public interface FirestoreOptions extends PipelineOptions { */ void setEmulatorHost(String host); - /** - * The Firestore database ID to connect to. Note: named database is currently an internal feature - * in Firestore. Do not set this to anything other than "(default)". - */ + /** The Firestore database ID to connect to. */ @Description("Firestore database ID") @Default.String("(default)") String getFirestoreDb(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 4e8c11f7072c..390e102b6010 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.security.SecureRandom; import java.util.Map; +import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -65,6 +66,13 @@ private FirestoreStatefulComponentFactory() {} * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ FirestoreStub getFirestoreStub(PipelineOptions options) { + return getFirestoreStub(options, null, null); + } + + FirestoreStub getFirestoreStub( + PipelineOptions options, + @Nullable String configuredProjectId, + @Nullable String configuredDatabaseId) { try { FirestoreSettings.Builder builder = FirestoreSettings.newBuilder(); @@ -94,12 +102,17 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { builder .setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential())) .setEndpoint(firestoreOptions.getFirestoreHost()); + String projectId = + configuredProjectId != null + ? configuredProjectId + : firestoreOptions.getFirestoreProject(); + if (projectId == null) { + projectId = gcpOptions.getProject(); + } + String databaseId = + configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb(); headers.put( - "x-goog-request-params", - "project_id=" - + gcpOptions.getProject() - + "&database_id=" - + firestoreOptions.getFirestoreDb()); + "x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId); } builder.setHeaderProvider( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java index 6bbb00e76f2d..ab33d8e5c166 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java @@ -244,7 +244,9 @@ public final void startBundle(StartBundleContext c) { requireNonNull( databaseId, "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); - firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); + firestoreStub = + firestoreStatefulComponentFactory.getFirestoreStub( + c.getPipelineOptions(), project, databaseId); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java index 623f947c45a7..f20181fbc320 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java @@ -111,7 +111,7 @@ public final void setUp() { when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2); when(ff.getRpcQos(any())).thenReturn(rpcQos); - when(ff.getFirestoreStub(pipelineOptions)).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(stub.batchWriteCallable()).thenReturn(callable); metricsFixture = new MetricsFixture(); } @@ -129,7 +129,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { Write write = newWrite(); Element element1 = new WriteElement(0, write, window); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)) .thenReturn(attempt); @@ -175,7 +175,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { @Override @Test public final void noRequestIsSentIfNotSafeToProceed() throws Exception { - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)) .thenReturn(attempt); @@ -369,7 +369,7 @@ public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottlin LOG.debug("options = {}", options); FirestoreStatefulComponentFactory ff = mock(FirestoreStatefulComponentFactory.class); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); Random random = new Random(12345); TestClock clock = new TestClock(Instant.EPOCH, Duration.standardSeconds(1)); Sleeper sleeper = millis -> clock.setNext(advanceClockBy(Duration.millis(millis))); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java index 35d0ea9482d3..e7f98ff73c6b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java @@ -67,7 +67,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception { int maxBytes = 50; RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java index 3e37e3975bf5..e7174537943e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java @@ -76,7 +76,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception { int maxBytes = 50; RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java index 8695080cb885..14344b105b35 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java @@ -42,6 +42,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; +import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions; import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions; import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.CleanupMode; import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DataLayout; @@ -97,7 +98,7 @@ abstract class BaseFirestoreIT { @Before public void setup() { project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); - databaseId = "firestoredb"; + databaseId = TestPipeline.testingPipelineOptions().as(FirestoreOptions.class).getFirestoreDb(); } private static Instant toWriteTime(WriteResult result) {