From 315b72e5ecc719ffde3b1e130e769acea47c4d96 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 26 Nov 2025 19:39:51 +0100 Subject: [PATCH 1/2] fix retry scenario for query to table materialization --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 ++ .../bigquery/BigQueryStorageQuerySource.java | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7aef1bd1ce02..69b9c62ceea9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -979,6 +979,8 @@ public void processElement( getParseFn(), getOutputCoder(), getBigQueryServices()); + // due to retry, table may already exist, remove it to ensure correctness + querySource.removeDestinationIfExists(options.as(BigQueryOptions.class)); Table queryResultTable = querySource.getTargetTable(options.as(BigQueryOptions.class)); BigQueryStorageTableSource output = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java index a2350ef19a74..5c160712fa0e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.bigquery.model.JobStatistics; @@ -25,6 +26,7 @@ import com.google.cloud.bigquery.storage.v1.DataFormat; import java.io.IOException; import java.io.ObjectInputStream; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority; @@ -188,4 +190,26 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { protected @Nullable String getTargetTableId(BigQueryOptions options) throws Exception { return null; } + + void removeDestinationIfExists(BigQueryOptions options) throws Exception { + DatasetService datasetService = bqServices.getDatasetService(options.as(BigQueryOptions.class)); + String project = queryTempProject; + if (project == null) { + project = + options.as(BigQueryOptions.class).getBigQueryProject() == null + ? options.as(BigQueryOptions.class).getProject() + : options.as(BigQueryOptions.class).getBigQueryProject(); + } + String tempTableID = + BigQueryResourceNaming.createJobIdPrefix( + options.getJobName(), stepUuid, BigQueryResourceNaming.JobType.QUERY); + TableReference tempTableReference = + createTempTableReference(project, tempTableID, Optional.ofNullable(queryTempDataset)); + Table destTable = datasetService.getTable(tempTableReference); + System.out.println("remoooving" + tempTableReference); + if (destTable != null) { + System.out.println("remoooving" + tempTableReference); + datasetService.deleteTable(tempTableReference); + } + } } From e5d80e15d5cc4fefba332981c0909a18d53f7005 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 26 Nov 2025 19:40:56 +0100 Subject: [PATCH 2/2] fix retry scenario for query to table materialization --- .../beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java index 5c160712fa0e..07c3273c293c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java @@ -206,9 +206,7 @@ void removeDestinationIfExists(BigQueryOptions options) throws Exception { TableReference tempTableReference = createTempTableReference(project, tempTableID, Optional.ofNullable(queryTempDataset)); Table destTable = datasetService.getTable(tempTableReference); - System.out.println("remoooving" + tempTableReference); if (destTable != null) { - System.out.println("remoooving" + tempTableReference); datasetService.deleteTable(tempTableReference); } }