Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).

## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ public interface FirestoreOptions extends PipelineOptions {
*/
void setEmulatorHost(String host);

/**
* The Firestore database ID to connect to. Note: named database is currently an internal feature
* in Firestore. Do not set this to anything other than "(default)".
*/
/** The Firestore database ID to connect to. */
@Description("Firestore database ID")
@Default.String("(default)")
String getFirestoreDb();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -65,6 +66,13 @@ private FirestoreStatefulComponentFactory() {}
* @return a new {@link FirestoreStub} pre-configured with values from the provided options
*/
FirestoreStub getFirestoreStub(PipelineOptions options) {
return getFirestoreStub(options, null, null);
}

FirestoreStub getFirestoreStub(
PipelineOptions options,
@Nullable String configuredProjectId,
@Nullable String configuredDatabaseId) {
try {
FirestoreSettings.Builder builder = FirestoreSettings.newBuilder();

Expand Down Expand Up @@ -94,12 +102,17 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {
builder
.setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential()))
.setEndpoint(firestoreOptions.getFirestoreHost());
String projectId =
configuredProjectId != null
? configuredProjectId
: firestoreOptions.getFirestoreProject();
if (projectId == null) {
projectId = gcpOptions.getProject();
}
String databaseId =
configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb();
headers.put(
"x-goog-request-params",
"project_id="
+ gcpOptions.getProject()
+ "&database_id="
+ firestoreOptions.getFirestoreDb());
"x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId);
}

builder.setHeaderProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ public final void startBundle(StartBundleContext c) {
requireNonNull(
databaseId,
"firestoreDb must be defined on FirestoreOptions of PipelineOptions"));
firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
firestoreStub =
firestoreStatefulComponentFactory.getFirestoreStub(
c.getPipelineOptions(), project, databaseId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final void setUp() {
when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2);

when(ff.getRpcQos(any())).thenReturn(rpcQos);
when(ff.getFirestoreStub(pipelineOptions)).thenReturn(stub);
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(stub.batchWriteCallable()).thenReturn(callable);
metricsFixture = new MetricsFixture();
}
Expand All @@ -129,7 +129,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception {
Write write = newWrite();
Element<Write> element1 = new WriteElement(0, write, window);

when(ff.getFirestoreStub(any())).thenReturn(stub);
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any())).thenReturn(rpcQos);
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
.thenReturn(attempt);
Expand Down Expand Up @@ -175,7 +175,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception {
@Override
@Test
public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
when(ff.getFirestoreStub(any())).thenReturn(stub);
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any())).thenReturn(rpcQos);
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
.thenReturn(attempt);
Expand Down Expand Up @@ -369,7 +369,7 @@ public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottlin
LOG.debug("options = {}", options);

FirestoreStatefulComponentFactory ff = mock(FirestoreStatefulComponentFactory.class);
when(ff.getFirestoreStub(any())).thenReturn(stub);
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
Random random = new Random(12345);
TestClock clock = new TestClock(Instant.EPOCH, Duration.standardSeconds(1));
Sleeper sleeper = millis -> clock.setNext(advanceClockBy(Duration.millis(millis)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception {
int maxBytes = 50;
RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();

when(ff.getFirestoreStub(any())).thenReturn(stub);
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any()))
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception {
int maxBytes = 50;
RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();

when(ff.getFirestoreStub(any())).thenReturn(stub);
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any()))
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.CleanupMode;
import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DataLayout;
Expand Down Expand Up @@ -97,7 +98,7 @@ abstract class BaseFirestoreIT {
@Before
public void setup() {
project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
databaseId = "firestoredb";
databaseId = TestPipeline.testingPipelineOptions().as(FirestoreOptions.class).getFirestoreDb();
}

private static Instant toWriteTime(WriteResult result) {
Expand Down
Loading