Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).

## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -65,6 +66,13 @@ private FirestoreStatefulComponentFactory() {}
* @return a new {@link FirestoreStub} pre-configured with values from the provided options
*/
FirestoreStub getFirestoreStub(PipelineOptions options) {
return getFirestoreStub(options, null, null);
}

FirestoreStub getFirestoreStub(
PipelineOptions options,
@Nullable String configuredProjectId,
@Nullable String configuredDatabaseId) {
try {
FirestoreSettings.Builder builder = FirestoreSettings.newBuilder();

Expand Down Expand Up @@ -94,12 +102,17 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {
builder
.setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential()))
.setEndpoint(firestoreOptions.getFirestoreHost());
String projectId =
configuredProjectId != null
? configuredProjectId
: firestoreOptions.getFirestoreProject();
if (projectId == null) {
projectId = gcpOptions.getProject();
}
String databaseId =
configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb();
headers.put(
"x-goog-request-params",
"project_id="
+ gcpOptions.getProject()
+ "&database_id="
+ firestoreOptions.getFirestoreDb());
"x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId);
}

builder.setHeaderProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ public final void startBundle(StartBundleContext c) {
requireNonNull(
databaseId,
"firestoreDb must be defined on FirestoreOptions of PipelineOptions"));
firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
firestoreStub =
firestoreStatefulComponentFactory.getFirestoreStub(
c.getPipelineOptions(), project, databaseId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ abstract class BaseFirestoreIT {
public final TestName testName = new TestName();

@Rule(order = 2)
public final FirestoreTestingHelper helper = new FirestoreTestingHelper(CleanupMode.ALWAYS);
public final FirestoreTestingHelper helper =
new FirestoreTestingHelper(CleanupMode.ALWAYS, "firestoredb");

@Rule(order = 3)
public final TestPipeline testPipeline = TestPipeline.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -109,7 +110,6 @@ enum DataLayout {
private final GcpOptions gcpOptions;
private final org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions firestoreBeamOptions;
private final FirestoreOptions firestoreOptions;

private final Firestore fs;
private final FirestoreRpc rpc;
private final CleanupMode cleanupMode;
Expand All @@ -125,7 +125,7 @@ enum DataLayout {

@SuppressWarnings(
"initialization.fields.uninitialized") // testClass and testName are managed via #apply
public FirestoreTestingHelper(CleanupMode cleanupMode) {
public FirestoreTestingHelper(CleanupMode cleanupMode, @Nullable String databaseId) {
this.cleanupMode = cleanupMode;
gcpOptions = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
firestoreBeamOptions =
Expand All @@ -136,7 +136,7 @@ public FirestoreTestingHelper(CleanupMode cleanupMode) {
.setCredentials(gcpOptions.getGcpCredential())
.setProjectId(
firstNonNull(firestoreBeamOptions.getFirestoreProject(), gcpOptions.getProject()))
.setDatabaseId(firestoreBeamOptions.getFirestoreDb())
.setDatabaseId(firstNonNull(databaseId, firestoreBeamOptions.getFirestoreDb()))
.setHost(firestoreBeamOptions.getFirestoreHost())
.build();
fs = firestoreOptions.getService();
Expand Down
Loading