From 5a054d188836b243ceb5e8afa0c73429cb9176eb Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Fri, 26 Sep 2025 16:48:28 +0530 Subject: [PATCH 1/8] Made changes to fix infinite retry when dataset is not found --- .../DebeziumReadSchemaTransformProvider.java | 1 - .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 21 ++++---- .../gcp/bigquery/UpdateSchemaDestination.java | 32 ++++++++++-- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 41 +++++++++++++--- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 32 +++++++++++- .../io/gcp/bigquery/BigQueryHelpersTest.java | 49 ++++++++++++++++--- 6 files changed, 145 insertions(+), 31 deletions(-) diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index d5f3f98f3b5e..d85bb1a7dc54 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.Schema; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 129c8314fc80..63f566c01c31 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -632,20 +632,17 @@ static void verifyDatasetPresence(DatasetService datasetService, TableReference } static String getDatasetLocation( - DatasetService datasetService, String projectId, String datasetId) { - Dataset dataset; + DatasetService datasetService, String projectId, String datasetId) + throws IOException, InterruptedException { try { - dataset = datasetService.getDataset(projectId, datasetId); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException( - String.format( - "unable to obtain dataset for dataset %s in project %s", datasetId, projectId), - e); + Dataset dataset = datasetService.getDataset(projectId, datasetId); + return dataset.getLocation(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; } - return dataset.getLocation(); + // Remove the catch (Exception e) block entirely + // Let IOException bubble up naturally without wrapping it in RuntimeException } static void verifyTablePresence(DatasetService datasetService, TableReference table) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 65bb3bf11b1b..8055204e9502 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; import java.util.List; import java.util.Map; @@ -302,9 +303,34 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( loadJobProjectId == null || loadJobProjectId.get() == null ? tableReference.getProjectId() : loadJobProjectId.get(); - String bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + } catch (IOException e) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + if (errorExtractor.itemNotFound(e)) { + throw new RuntimeException( + String.format( + "Dataset %s not found in project %s. Please ensure the dataset exists before running the pipeline.", + tableReference.getDatasetId(), tableReference.getProjectId()), + e); + } + // For other IOExceptions, wrap and throw + throw new RuntimeException( + String.format( + "Unable to get dataset location for dataset %s in project %s", + tableReference.getDatasetId(), tableReference.getProjectId()), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format( + "Interrupted while getting dataset location for dataset %s in project %s", + tableReference.getDatasetId(), tableReference.getProjectId()), + e); + } BigQueryHelpers.PendingJob retryJob = new BigQueryHelpers.PendingJob( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 061e66024e29..4c905ee34db3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -21,6 +21,7 @@ import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; import java.io.Serializable; import java.util.Collection; @@ -318,6 +319,7 @@ private BigQueryHelpers.PendingJob startCopy( CreateDisposition createDisposition, @Nullable String kmsKey, @Nullable ValueProvider loadJobProjectId) { + JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() .setSourceTables(tempTables) @@ -329,14 +331,41 @@ private BigQueryHelpers.PendingJob startCopy( new EncryptionConfiguration().setKmsKeyName(kmsKey)); } - String bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, ref.getProjectId(), ref.getDatasetId()); + // Move dataset location lookup OUTSIDE the retry loop and handle failures immediately + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); + } catch (IOException e) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + if (errorExtractor.itemNotFound(e)) { + throw new RuntimeException( + String.format( + "Dataset %s not found in project %s. Please ensure the dataset exists before running the pipeline.", + ref.getDatasetId(), ref.getProjectId()), + e); + } + // For other IOExceptions, wrap and throw + throw new RuntimeException( + String.format( + "Unable to get dataset location for dataset %s in project %s", + ref.getDatasetId(), ref.getProjectId()), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format( + "Interrupted while getting dataset location for dataset %s in project %s", + ref.getDatasetId(), ref.getProjectId()), + e); + } String projectId = loadJobProjectId == null || loadJobProjectId.get() == null ? ref.getProjectId() : loadJobProjectId.get(); + BigQueryHelpers.PendingJob retryJob = new BigQueryHelpers.PendingJob( jobId -> { @@ -344,7 +373,7 @@ private BigQueryHelpers.PendingJob startCopy( new JobReference() .setProjectId(projectId) .setJobId(jobId.getJobId()) - .setLocation(bqLocation); + .setLocation(bqLocation); // Use pre-resolved location LOG.info( "Starting copy job for table {} using {}, job id iteration {}", ref, @@ -364,7 +393,7 @@ private BigQueryHelpers.PendingJob startCopy( new JobReference() .setProjectId(projectId) .setJobId(jobId.getJobId()) - .setLocation(bqLocation); + .setLocation(bqLocation); // Use pre-resolved location try { return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); } catch (InterruptedException e) { @@ -377,7 +406,7 @@ private BigQueryHelpers.PendingJob startCopy( new JobReference() .setProjectId(projectId) .setJobId(jobId.getJobId()) - .setLocation(bqLocation); + .setLocation(bqLocation); // Use pre-resolved location try { return jobService.getJob(jobRef); } catch (InterruptedException | IOException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 288b94ce081b..5d2037536d16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -27,6 +27,7 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -505,8 +506,35 @@ private PendingJob startLoad( loadJobProjectId == null || loadJobProjectId.get() == null ? ref.getProjectId() : loadJobProjectId.get(); - String bqLocation = - BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); + + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); + } catch (IOException e) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + if (errorExtractor.itemNotFound(e)) { + throw new RuntimeException( + String.format( + "Dataset %s not found in project %s. Please ensure the dataset exists before running the pipeline.", + ref.getDatasetId(), ref.getProjectId()), + e); + } + // For other IOExceptions, wrap and throw + throw new RuntimeException( + String.format( + "Unable to get dataset location for dataset %s in project %s", + ref.getDatasetId(), ref.getProjectId()), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format( + "Interrupted while getting dataset location for dataset %s in project %s", + ref.getDatasetId(), ref.getProjectId()), + e); + } PendingJob retryJob = new PendingJob( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java index 0d36d7bb46d0..1bc6191600ec 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java @@ -18,15 +18,15 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.api.client.util.Data; -import com.google.api.services.bigquery.model.Clustering; -import com.google.api.services.bigquery.model.ErrorProto; -import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobReference; -import com.google.api.services.bigquery.model.JobStatus; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.*; +import java.io.IOException; import java.util.Arrays; import java.util.Optional; import java.util.Random; @@ -271,4 +271,39 @@ public void testClusteringJsonConversion() { assertEquals(clustering, BigQueryHelpers.clusteringFromJsonFields(jsonClusteringFields)); } + + @Test + public void testGetDatasetLocationWithNonExistentDataset() + throws IOException, InterruptedException { + BigQueryServices.DatasetService mockDatasetService = + mock(BigQueryServices.DatasetService.class); + + IOException notFoundException = new IOException("Dataset not found"); + when(mockDatasetService.getDataset("project", "nonexistent_dataset")) + .thenThrow(notFoundException); + + try { + BigQueryHelpers.getDatasetLocation(mockDatasetService, "project", "nonexistent_dataset"); + fail("Expected IOException to be thrown"); + } catch (IOException e) { + assertEquals("Dataset not found", e.getMessage()); + // Verify that getDataset was called only once (the IOException is not wrapped and re-thrown) + verify(mockDatasetService, times(1)).getDataset("project", "nonexistent_dataset"); + } + } + + @Test + public void testGetDatasetLocationWithValidDataset() throws IOException, InterruptedException { + BigQueryServices.DatasetService mockDatasetService = + mock(BigQueryServices.DatasetService.class); + Dataset mockDataset = new Dataset().setLocation("US"); + + when(mockDatasetService.getDataset("project", "existing_dataset")).thenReturn(mockDataset); + + String location = + BigQueryHelpers.getDatasetLocation(mockDatasetService, "project", "existing_dataset"); + + assertEquals("US", location); + verify(mockDatasetService, times(1)).getDataset("project", "existing_dataset"); + } } From 4a8898bfee197efd8815ea61b1d79e7bc5dd1ebc Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Fri, 26 Sep 2025 17:16:18 +0530 Subject: [PATCH 2/8] Minor changes --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 2 -- .../beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java | 2 +- .../org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java | 8 +++----- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 63f566c01c31..36757cf61c8f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -641,8 +641,6 @@ static String getDatasetLocation( Thread.currentThread().interrupt(); throw e; } - // Remove the catch (Exception e) block entirely - // Let IOException bubble up naturally without wrapping it in RuntimeException } static void verifyTablePresence(DatasetService datasetService, TableReference table) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 8055204e9502..4a2c25269548 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -317,7 +317,7 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( tableReference.getDatasetId(), tableReference.getProjectId()), e); } - // For other IOExceptions, wrap and throw + throw new RuntimeException( String.format( "Unable to get dataset location for dataset %s in project %s", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 4c905ee34db3..9f37b6450aac 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -331,7 +331,6 @@ private BigQueryHelpers.PendingJob startCopy( new EncryptionConfiguration().setKmsKeyName(kmsKey)); } - // Move dataset location lookup OUTSIDE the retry loop and handle failures immediately String bqLocation; try { bqLocation = @@ -346,7 +345,6 @@ private BigQueryHelpers.PendingJob startCopy( ref.getDatasetId(), ref.getProjectId()), e); } - // For other IOExceptions, wrap and throw throw new RuntimeException( String.format( "Unable to get dataset location for dataset %s in project %s", @@ -373,7 +371,7 @@ private BigQueryHelpers.PendingJob startCopy( new JobReference() .setProjectId(projectId) .setJobId(jobId.getJobId()) - .setLocation(bqLocation); // Use pre-resolved location + .setLocation(bqLocation); LOG.info( "Starting copy job for table {} using {}, job id iteration {}", ref, @@ -393,7 +391,7 @@ private BigQueryHelpers.PendingJob startCopy( new JobReference() .setProjectId(projectId) .setJobId(jobId.getJobId()) - .setLocation(bqLocation); // Use pre-resolved location + .setLocation(bqLocation); try { return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); } catch (InterruptedException e) { @@ -406,7 +404,7 @@ private BigQueryHelpers.PendingJob startCopy( new JobReference() .setProjectId(projectId) .setJobId(jobId.getJobId()) - .setLocation(bqLocation); // Use pre-resolved location + .setLocation(bqLocation); try { return jobService.getJob(jobRef); } catch (InterruptedException | IOException e) { From 9dc152ca691fb53c4a6fe68ac9cc279b11937350 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sun, 28 Sep 2025 11:26:21 +0530 Subject: [PATCH 3/8] Import changes --- .../beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java index 1bc6191600ec..a6df4747c039 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java @@ -25,7 +25,14 @@ import static org.mockito.Mockito.when; import com.google.api.client.util.Data; -import com.google.api.services.bigquery.model.*; +import com.google.api.services.bigquery.model.Clustering; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import java.util.Arrays; import java.util.Optional; From 5143c918b6a12497024a9e1ab6b512ca5a222e28 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Wed, 24 Dec 2025 15:51:05 +0530 Subject: [PATCH 4/8] reverting changes --- .../gcp/bigquery/UpdateSchemaDestination.java | 316 ++++++------ .../beam/sdk/io/gcp/bigquery/WriteRename.java | 33 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 456 ++++++++---------- 3 files changed, 362 insertions(+), 443 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 4a2c25269548..a4004c943cf5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -26,7 +26,6 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; -import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; import java.util.List; import java.util.Map; @@ -61,9 +60,9 @@ */ @SuppressWarnings({"nullness"}) public class UpdateSchemaDestination - extends DoFn< - Iterable>, - Iterable>> { + extends DoFn< + Iterable>, + Iterable>> { private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); private final BigQueryServices bqServices; @@ -84,9 +83,9 @@ private static class PendingJobData { final BoundedWindow window; public PendingJobData( - BigQueryHelpers.PendingJob retryJob, - TableDestination tableDestination, - BoundedWindow window) { + BigQueryHelpers.PendingJob retryJob, + TableDestination tableDestination, + BoundedWindow window) { this.retryJob = retryJob; this.tableDestination = tableDestination; this.window = window; @@ -96,15 +95,15 @@ public PendingJobData( private final Map pendingJobs = Maps.newHashMap(); public UpdateSchemaDestination( - BigQueryServices bqServices, - PCollectionView zeroLoadJobIdPrefixView, - @Nullable ValueProvider loadJobProjectId, - BigQueryIO.Write.WriteDisposition writeDisposition, - BigQueryIO.Write.CreateDisposition createDisposition, - int maxRetryJobs, - @Nullable String kmsKey, - Set schemaUpdateOptions, - DynamicDestinations dynamicDestinations) { + BigQueryServices bqServices, + PCollectionView zeroLoadJobIdPrefixView, + @Nullable ValueProvider loadJobProjectId, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + int maxRetryJobs, + @Nullable String kmsKey, + Set schemaUpdateOptions, + DynamicDestinations dynamicDestinations) { this.loadJobProjectId = loadJobProjectId; this.zeroLoadJobIdPrefixView = zeroLoadJobIdPrefixView; this.bqServices = bqServices; @@ -124,8 +123,8 @@ public void startBundle(StartBundleContext c) { TableDestination getTableWithDefaultProject(DestinationT destination) { if (dynamicDestinations.getPipelineOptions() == null) { throw new IllegalStateException( - "Unexpected null pipeline option for DynamicDestination object. " - + "Need to call setSideInputAccessorFromProcessContext(context) before use it."); + "Unexpected null pipeline option for DynamicDestination object. " + + "Need to call setSideInputAccessorFromProcessContext(context) before use it."); } BigQueryOptions options = dynamicDestinations.getPipelineOptions().as(BigQueryOptions.class); TableDestination tableDestination = dynamicDestinations.getTable(destination); @@ -133,9 +132,9 @@ TableDestination getTableWithDefaultProject(DestinationT destination) { if (Strings.isNullOrEmpty(tableReference.getProjectId())) { tableReference.setProjectId( - options.getBigQueryProject() == null - ? options.getProject() - : options.getBigQueryProject()); + options.getBigQueryProject() == null + ? options.getProject() + : options.getBigQueryProject()); tableDestination = tableDestination.withTableReference(tableReference); } @@ -144,10 +143,10 @@ TableDestination getTableWithDefaultProject(DestinationT destination) { @ProcessElement public void processElement( - @Element Iterable> element, - ProcessContext context, - BoundedWindow window) - throws IOException { + @Element Iterable> element, + ProcessContext context, + BoundedWindow window) + throws IOException { dynamicDestinations.setSideInputAccessorFromProcessContext(context); List> outputs = Lists.newArrayList(); for (KV entry : element) { @@ -161,33 +160,33 @@ public void processElement( TableSchema schema = dynamicDestinations.getSchema(destination); TableReference tableReference = tableDestination.getTableReference(); String jobIdPrefix = - BigQueryResourceNaming.createJobIdWithDestination( - context.sideInput(zeroLoadJobIdPrefixView), - tableDestination, - 1, - context.pane().getIndex()); + BigQueryResourceNaming.createJobIdWithDestination( + context.sideInput(zeroLoadJobIdPrefixView), + tableDestination, + 1, + context.pane().getIndex()); BigQueryHelpers.PendingJob updateSchemaDestinationJob = - startZeroLoadJob( - getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), - getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - tableReference, - tableDestination.getTimePartitioning(), - tableDestination.getClustering(), - schema, - writeDisposition, - createDisposition, - schemaUpdateOptions); + startZeroLoadJob( + getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + schema, + writeDisposition, + createDisposition, + schemaUpdateOptions); if (updateSchemaDestinationJob != null) { pendingJobs.put( - destination, new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); + destination, new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); } } if (!pendingJobs.isEmpty()) { LOG.info( - "Added {} pending jobs to update the schema for each destination before copying {} temp tables.", - pendingJobs.size(), - outputs.size()); + "Added {} pending jobs to update the schema for each destination before copying {} temp tables.", + pendingJobs.size(), + outputs.size()); } context.output(outputs); } @@ -211,61 +210,61 @@ public void onTeardown() { @FinishBundle public void finishBundle(FinishBundleContext context) throws Exception { DatasetService datasetService = - getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); BigQueryHelpers.PendingJobManager jobManager = new BigQueryHelpers.PendingJobManager(); for (final PendingJobData pendingJobData : pendingJobs.values()) { jobManager = - jobManager.addPendingJob( - pendingJobData.retryJob, - j -> { - try { - if (pendingJobData.tableDestination.getTableDescription() != null) { - TableReference ref = pendingJobData.tableDestination.getTableReference(); - datasetService.patchTableDescription( - ref.clone() - .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), - pendingJobData.tableDestination.getTableDescription()); - } - } catch (IOException | InterruptedException e) { - return e; - } - return null; - }); + jobManager.addPendingJob( + pendingJobData.retryJob, + j -> { + try { + if (pendingJobData.tableDestination.getTableDescription() != null) { + TableReference ref = pendingJobData.tableDestination.getTableReference(); + datasetService.patchTableDescription( + ref.clone() + .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + pendingJobData.tableDestination.getTableDescription()); + } + } catch (IOException | InterruptedException e) { + return e; + } + return null; + }); } jobManager.waitForDone(); } private BigQueryHelpers.PendingJob startZeroLoadJob( - BigQueryServices.JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference tableReference, - TimePartitioning timePartitioning, - Clustering clustering, - @Nullable TableSchema schema, - BigQueryIO.Write.WriteDisposition writeDisposition, - BigQueryIO.Write.CreateDisposition createDisposition, - Set schemaUpdateOptions) { + BigQueryServices.JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference tableReference, + TimePartitioning timePartitioning, + Clustering clustering, + @Nullable TableSchema schema, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + Set schemaUpdateOptions) { JobConfigurationLoad loadConfig = - new JobConfigurationLoad() - .setDestinationTable(tableReference) - .setSchema(schema) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON"); + new JobConfigurationLoad() + .setDestinationTable(tableReference) + .setSchema(schema) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat("NEWLINE_DELIMITED_JSON"); if (schemaUpdateOptions != null) { List options = - schemaUpdateOptions.stream() - .map(BigQueryIO.Write.SchemaUpdateOption::name) - .collect(Collectors.toList()); + schemaUpdateOptions.stream() + .map(BigQueryIO.Write.SchemaUpdateOption::name) + .collect(Collectors.toList()); loadConfig.setSchemaUpdateOptions(options); } if (!loadConfig - .getWriteDisposition() - .equals(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.toString()) - && !loadConfig - .getWriteDisposition() - .equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) { + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.toString()) + && !loadConfig + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) { return null; } final Table destinationTable; @@ -282,9 +281,9 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( // or when destination schema is null (the write will set the schema) // or when provided schema is null (e.g. when using CREATE_NEVER disposition) if (destinationTable.getSchema() == null - || destinationTable.getSchema().isEmpty() - || destinationTable.getSchema().equals(schema) - || schema == null) { + || destinationTable.getSchema().isEmpty() + || destinationTable.getSchema().equals(schema) + || schema == null) { return null; } if (timePartitioning != null) { @@ -297,92 +296,67 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( if (kmsKey != null) { loadConfig.setDestinationEncryptionConfiguration( - new EncryptionConfiguration().setKmsKeyName(kmsKey)); + new EncryptionConfiguration().setKmsKeyName(kmsKey)); } String projectId = - loadJobProjectId == null || loadJobProjectId.get() == null - ? tableReference.getProjectId() - : loadJobProjectId.get(); - String bqLocation; - try { - bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); - } catch (IOException e) { - ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - if (errorExtractor.itemNotFound(e)) { - throw new RuntimeException( - String.format( - "Dataset %s not found in project %s. Please ensure the dataset exists before running the pipeline.", - tableReference.getDatasetId(), tableReference.getProjectId()), - e); - } - - throw new RuntimeException( - String.format( - "Unable to get dataset location for dataset %s in project %s", - tableReference.getDatasetId(), tableReference.getProjectId()), - e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format( - "Interrupted while getting dataset location for dataset %s in project %s", - tableReference.getDatasetId(), tableReference.getProjectId()), - e); - } + loadJobProjectId == null || loadJobProjectId.get() == null + ? tableReference.getProjectId() + : loadJobProjectId.get(); + String bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); BigQueryHelpers.PendingJob retryJob = - new BigQueryHelpers.PendingJob( - // Function to load the data. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - LOG.info( - "Loading zero rows using job {}, job id {} iteration {}", - tableReference, - jobRef, - jobId.getRetryIndex()); - try { - jobService.startLoadJob( - jobRef, loadConfig, new ByteArrayContent("text/plain", new byte[0])); - } catch (IOException | InterruptedException e) { - LOG.warn("Schema update load job {} failed with {}", jobRef, e.toString()); - throw new RuntimeException(e); - } - return null; - }, - // Function to poll the result of a load job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - // Function to lookup a job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.getJob(jobRef); - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); - } - }, - maxRetryJobs, - jobIdPrefix); + new BigQueryHelpers.PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading zero rows using job {}, job id {} iteration {}", + tableReference, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob( + jobRef, loadConfig, new ByteArrayContent("text/plain", new byte[0])); + } catch (IOException | InterruptedException e) { + LOG.warn("Schema update load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); return retryJob; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 9f37b6450aac..061e66024e29 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -21,7 +21,6 @@ import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; -import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; import java.io.Serializable; import java.util.Collection; @@ -319,7 +318,6 @@ private BigQueryHelpers.PendingJob startCopy( CreateDisposition createDisposition, @Nullable String kmsKey, @Nullable ValueProvider loadJobProjectId) { - JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() .setSourceTables(tempTables) @@ -331,39 +329,14 @@ private BigQueryHelpers.PendingJob startCopy( new EncryptionConfiguration().setKmsKeyName(kmsKey)); } - String bqLocation; - try { - bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, ref.getProjectId(), ref.getDatasetId()); - } catch (IOException e) { - ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - if (errorExtractor.itemNotFound(e)) { - throw new RuntimeException( - String.format( - "Dataset %s not found in project %s. Please ensure the dataset exists before running the pipeline.", - ref.getDatasetId(), ref.getProjectId()), - e); - } - throw new RuntimeException( - String.format( - "Unable to get dataset location for dataset %s in project %s", - ref.getDatasetId(), ref.getProjectId()), - e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format( - "Interrupted while getting dataset location for dataset %s in project %s", - ref.getDatasetId(), ref.getProjectId()), - e); - } + String bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); String projectId = loadJobProjectId == null || loadJobProjectId.get() == null ? ref.getProjectId() : loadJobProjectId.get(); - BigQueryHelpers.PendingJob retryJob = new BigQueryHelpers.PendingJob( jobId -> { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 5d2037536d16..fb42f540e756 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -27,7 +27,6 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -95,9 +94,9 @@ * {@link KV} maps the final table to itself. */ class WriteTables - extends PTransform< - PCollection, WritePartition.Result>>, - PCollection>> { + extends PTransform< + PCollection, WritePartition.Result>>, + PCollection>> { @AutoValue abstract static class Result { abstract String getTableName(); @@ -117,7 +116,7 @@ public void encode(Result value, OutputStream outStream) throws CoderException, @Override public Result decode(InputStream inStream) throws CoderException, IOException { return new AutoValue_WriteTables_Result( - StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); + StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); } } @@ -144,7 +143,7 @@ public Result decode(InputStream inStream) throws CoderException, IOException { private final @Nullable String tempDataset; private class WriteTablesDoFn - extends DoFn, WritePartition.Result>, KV> { + extends DoFn, WritePartition.Result>, KV> { private Map jsonSchemas = Maps.newHashMap(); @@ -159,13 +158,13 @@ private class PendingJobData { final boolean isFirstPane; public PendingJobData( - BoundedWindow window, - BigQueryHelpers.PendingJob retryJob, - List partitionFiles, - TableDestination tableDestination, - TableReference tableReference, - DestinationT destinationT, - boolean isFirstPane) { + BoundedWindow window, + BigQueryHelpers.PendingJob retryJob, + List partitionFiles, + TableDestination tableDestination, + TableReference tableReference, + DestinationT destinationT, + boolean isFirstPane) { this.window = window; this.retryJob = retryJob; this.partitionFiles = partitionFiles; @@ -188,10 +187,10 @@ public void startBundle(StartBundleContext c) { @ProcessElement public void processElement( - @Element KV, WritePartition.Result> element, - ProcessContext c, - BoundedWindow window) - throws Exception { + @Element KV, WritePartition.Result> element, + ProcessContext c, + BoundedWindow window) + throws Exception { dynamicDestinations.setSideInputAccessorFromProcessContext(c); DestinationT destination = c.element().getKey().getKey(); TableSchema tableSchema; @@ -200,54 +199,54 @@ public void processElement( } else if (jsonSchemas.containsKey(destination)) { // tableSchema for the destination stored in cache (jsonSchemas) tableSchema = - BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class); + BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class); } else { tableSchema = dynamicDestinations.getSchema(destination); Preconditions.checkArgumentNotNull( - tableSchema, - "Unless create disposition is %s, a schema must be specified, i.e. " - + "DynamicDestinations.getSchema() may not return null. " - + "However, create disposition is %s, and %s returned null for destination %s", - CreateDisposition.CREATE_NEVER, - firstPaneCreateDisposition, - dynamicDestinations, - destination); + tableSchema, + "Unless create disposition is %s, a schema must be specified, i.e. " + + "DynamicDestinations.getSchema() may not return null. " + + "However, create disposition is %s, and %s returned null for destination %s", + CreateDisposition.CREATE_NEVER, + firstPaneCreateDisposition, + dynamicDestinations, + destination); LOG.debug("Fetched TableSchema for table {}:\n\t{}", destination, tableSchema); jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); } TableDestination tableDestination = dynamicDestinations.getTable(destination); checkArgument( - tableDestination != null, - "DynamicDestinations.getTable() may not return null, " - + "but %s returned null for destination %s", - dynamicDestinations, - destination); + tableDestination != null, + "DynamicDestinations.getTable() may not return null, " + + "but %s returned null for destination %s", + dynamicDestinations, + destination); boolean destinationCoderSupportsClustering = - !(dynamicDestinations.getDestinationCoder() instanceof TableDestinationCoderV2); + !(dynamicDestinations.getDestinationCoder() instanceof TableDestinationCoderV2); checkArgument( - tableDestination.getClustering() == null || destinationCoderSupportsClustering, - "DynamicDestinations.getTable() may only return destinations with clustering configured" - + " if a destination coder is supplied that supports clustering, but %s is configured" - + " to use TableDestinationCoderV2. Set withClustering() on BigQueryIO.write() and, " - + " if you provided a custom DynamicDestinations instance, override" - + " getDestinationCoder() to return TableDestinationCoderV3.", - dynamicDestinations); + tableDestination.getClustering() == null || destinationCoderSupportsClustering, + "DynamicDestinations.getTable() may only return destinations with clustering configured" + + " if a destination coder is supplied that supports clustering, but %s is configured" + + " to use TableDestinationCoderV2. Set withClustering() on BigQueryIO.write() and, " + + " if you provided a custom DynamicDestinations instance, override" + + " getDestinationCoder() to return TableDestinationCoderV3.", + dynamicDestinations); TableReference tableReference = tableDestination.getTableReference(); if (Strings.isNullOrEmpty(tableReference.getProjectId())) { BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class); tableReference.setProjectId( - options.getBigQueryProject() == null - ? options.getProject() - : options.getBigQueryProject()); + options.getBigQueryProject() == null + ? options.getProject() + : options.getBigQueryProject()); tableDestination = tableDestination.withTableReference(tableReference); } Integer partition = element.getKey().getShardNumber(); List partitionFiles = Lists.newArrayList(element.getValue().getFilenames()); String jobIdPrefix = - BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); + BigQueryResourceNaming.createJobIdWithDestination( + c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); if (tempTable) { if (tempDataset != null) { @@ -257,10 +256,10 @@ public void processElement( tableReference.setTableId(jobIdPrefix); } else { Lineage.getSinks() - .add( - "bigquery", - BigQueryHelpers.dataCatalogSegments( - tableReference, c.getPipelineOptions().as(BigQueryOptions.class))); + .add( + "bigquery", + BigQueryHelpers.dataCatalogSegments( + tableReference, c.getPipelineOptions().as(BigQueryOptions.class))); } WriteDisposition writeDisposition = firstPaneWriteDisposition; @@ -278,28 +277,28 @@ public void processElement( } BigQueryHelpers.PendingJob retryJob = - startLoad( - getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - tableReference, - tableDestination.getTimePartitioning(), - tableDestination.getClustering(), - tableSchema, - partitionFiles, - writeDisposition, - createDisposition, - schemaUpdateOptions); + startLoad( + getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + tableSchema, + partitionFiles, + writeDisposition, + createDisposition, + schemaUpdateOptions); pendingJobs.add( - new PendingJobData( - window, - retryJob, - partitionFiles, - tableDestination, - tableReference, - destination, - element.getValue().isFirstPane())); + new PendingJobData( + window, + retryJob, + partitionFiles, + tableDestination, + tableReference, + destination, + element.getValue().isFirstPane())); } @Teardown @@ -335,52 +334,52 @@ private JobService getJobService(PipelineOptions pipelineOptions) throws IOExcep @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add( - DisplayData.item("launchesBigQueryJobs", true) - .withLabel("This transform launches BigQuery jobs to read/write elements.")); + DisplayData.item("launchesBigQueryJobs", true) + .withLabel("This transform launches BigQuery jobs to read/write elements.")); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { DatasetService datasetService = - getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); PendingJobManager jobManager = new PendingJobManager(); for (final PendingJobData pendingJob : pendingJobs) { jobManager = - jobManager.addPendingJob( - pendingJob.retryJob, - // Lambda called when the job is done. - j -> { - try { - if (pendingJob.tableDestination.getTableDescription() != null) { - TableReference ref = pendingJob.tableReference; - datasetService.patchTableDescription( - ref.clone() - .setTableId( - BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), - pendingJob.tableDestination.getTableDescription()); - } - Result result = - new AutoValue_WriteTables_Result( - BigQueryHelpers.toJsonString(pendingJob.tableReference), - pendingJob.isFirstPane); - c.output( - mainOutputTag, - KV.of(pendingJob.destinationT, result), - pendingJob.window.maxTimestamp(), - pendingJob.window); - for (String file : pendingJob.partitionFiles) { - c.output( - temporaryFilesTag, - file, - pendingJob.window.maxTimestamp(), - pendingJob.window); - } - return null; - } catch (IOException | InterruptedException e) { - return e; - } - }); + jobManager.addPendingJob( + pendingJob.retryJob, + // Lambda called when the job is done. + j -> { + try { + if (pendingJob.tableDestination.getTableDescription() != null) { + TableReference ref = pendingJob.tableReference; + datasetService.patchTableDescription( + ref.clone() + .setTableId( + BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + pendingJob.tableDestination.getTableDescription()); + } + Result result = + new AutoValue_WriteTables_Result( + BigQueryHelpers.toJsonString(pendingJob.tableReference), + pendingJob.isFirstPane); + c.output( + mainOutputTag, + KV.of(pendingJob.destinationT, result), + pendingJob.window.maxTimestamp(), + pendingJob.window); + for (String file : pendingJob.partitionFiles) { + c.output( + temporaryFilesTag, + file, + pendingJob.window.maxTimestamp(), + pendingJob.window); + } + return null; + } catch (IOException | InterruptedException e) { + return e; + } + }); } jobManager.waitForDone(); } @@ -394,21 +393,21 @@ public void processElement(ProcessContext c) throws Exception { } public WriteTables( - boolean tempTable, - BigQueryServices bqServices, - PCollectionView loadJobIdPrefixView, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - List> sideInputs, - DynamicDestinations dynamicDestinations, - @Nullable ValueProvider loadJobProjectId, - int maxRetryJobs, - boolean ignoreUnknownValues, - @Nullable String kmsKey, - String sourceFormat, - boolean useAvroLogicalTypes, - Set schemaUpdateOptions, - @Nullable String tempDataset) { + boolean tempTable, + BigQueryServices bqServices, + PCollectionView loadJobIdPrefixView, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + List> sideInputs, + DynamicDestinations dynamicDestinations, + @Nullable ValueProvider loadJobProjectId, + int maxRetryJobs, + boolean ignoreUnknownValues, + @Nullable String kmsKey, + String sourceFormat, + boolean useAvroLogicalTypes, + Set schemaUpdateOptions, + @Nullable String tempDataset) { this.tempTable = tempTable; this.bqServices = bqServices; @@ -431,12 +430,12 @@ public WriteTables( @Override public PCollection> expand( - PCollection, WritePartition.Result>> input) { + PCollection, WritePartition.Result>> input) { PCollectionTuple writeTablesOutputs = - input.apply( - ParDo.of(new WriteTablesDoFn()) - .withSideInputs(sideInputs) - .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag))); + input.apply( + ParDo.of(new WriteTablesDoFn()) + .withSideInputs(sideInputs) + .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag))); // Garbage collect temporary files. // We mustn't start garbage collecting files until we are assured that the WriteTablesDoFn has @@ -445,51 +444,51 @@ public PCollection> expand( // to missing files, causing either the entire workflow to fail or get stuck (depending on how // the runner handles persistent failures). writeTablesOutputs - .get(temporaryFilesTag) - .setCoder(StringUtf8Coder.of()) - .apply(WithKeys.of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) - .apply( - Window.>into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) - .discardingFiredPanes()) - .apply(GroupByKey.create()) - .apply(Values.create()) - .apply(ParDo.of(new GarbageCollectTemporaryFiles())); + .get(temporaryFilesTag) + .setCoder(StringUtf8Coder.of()) + .apply(WithKeys.of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) + .apply( + Window.>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()) + .apply(GroupByKey.create()) + .apply(Values.create()) + .apply(ParDo.of(new GarbageCollectTemporaryFiles())); return writeTablesOutputs.get(mainOutputTag); } private PendingJob startLoad( - JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference ref, - @Nullable TimePartitioning timePartitioning, - @Nullable Clustering clustering, - @Nullable TableSchema schema, - List gcsUris, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - Set schemaUpdateOptions) { + JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference ref, + @Nullable TimePartitioning timePartitioning, + @Nullable Clustering clustering, + @Nullable TableSchema schema, + List gcsUris, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + Set schemaUpdateOptions) { @SuppressWarnings({ "nullness" // nulls allowed in most fields but API client not annotated }) JobConfigurationLoad loadConfig = - new JobConfigurationLoad() - .setDestinationTable(ref) - .setSchema(schema) - .setSourceUris(gcsUris) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat(sourceFormat) - .setIgnoreUnknownValues(ignoreUnknownValues) - .setUseAvroLogicalTypes(useAvroLogicalTypes); + new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(gcsUris) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat(sourceFormat) + .setIgnoreUnknownValues(ignoreUnknownValues) + .setUseAvroLogicalTypes(useAvroLogicalTypes); if (schemaUpdateOptions != null) { List options = - schemaUpdateOptions.stream() - .map(Enum::name) - .collect(Collectors.toList()); + schemaUpdateOptions.stream() + .map(Enum::name) + .collect(Collectors.toList()); loadConfig.setSchemaUpdateOptions(options); } if (timePartitioning != null) { @@ -500,93 +499,66 @@ private PendingJob startLoad( } if (kmsKey != null) { loadConfig.setDestinationEncryptionConfiguration( - new EncryptionConfiguration().setKmsKeyName(kmsKey)); + new EncryptionConfiguration().setKmsKeyName(kmsKey)); } String projectId = - loadJobProjectId == null || loadJobProjectId.get() == null - ? ref.getProjectId() - : loadJobProjectId.get(); - - String bqLocation; - try { - bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, ref.getProjectId(), ref.getDatasetId()); - } catch (IOException e) { - ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - if (errorExtractor.itemNotFound(e)) { - throw new RuntimeException( - String.format( - "Dataset %s not found in project %s. Please ensure the dataset exists before running the pipeline.", - ref.getDatasetId(), ref.getProjectId()), - e); - } - // For other IOExceptions, wrap and throw - throw new RuntimeException( - String.format( - "Unable to get dataset location for dataset %s in project %s", - ref.getDatasetId(), ref.getProjectId()), - e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format( - "Interrupted while getting dataset location for dataset %s in project %s", - ref.getDatasetId(), ref.getProjectId()), - e); - } + loadJobProjectId == null || loadJobProjectId.get() == null + ? ref.getProjectId() + : loadJobProjectId.get(); + String bqLocation = + BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); PendingJob retryJob = - new PendingJob( - // Function to load the data. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - LOG.info( - "Loading {} files into {} using job {}, job id iteration {}", - gcsUris.size(), - ref, - jobRef, - jobId.getRetryIndex()); - try { - jobService.startLoadJob(jobRef, loadConfig); - } catch (IOException | InterruptedException e) { - LOG.warn("Load job {} failed with {}", jobRef, e.toString()); - throw new RuntimeException(e); - } - return null; - }, - // Function to poll the result of a load job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - // Function to lookup a job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.getJob(jobRef); - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); - } - }, - maxRetryJobs, - jobIdPrefix); + new PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading {} files into {} using job {}, job id iteration {}", + gcsUris.size(), + ref, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob(jobRef, loadConfig); + } catch (IOException | InterruptedException e) { + LOG.warn("Load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); return retryJob; } From 9951f3a9aae8abaceffbeaabb91a551b82f24506 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Wed, 24 Dec 2025 15:57:37 +0530 Subject: [PATCH 5/8] spotless --- .../gcp/bigquery/UpdateSchemaDestination.java | 288 ++++++------ .../beam/sdk/io/gcp/bigquery/WriteTables.java | 426 +++++++++--------- 2 files changed, 357 insertions(+), 357 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index a4004c943cf5..65bb3bf11b1b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -60,9 +60,9 @@ */ @SuppressWarnings({"nullness"}) public class UpdateSchemaDestination - extends DoFn< - Iterable>, - Iterable>> { + extends DoFn< + Iterable>, + Iterable>> { private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); private final BigQueryServices bqServices; @@ -83,9 +83,9 @@ private static class PendingJobData { final BoundedWindow window; public PendingJobData( - BigQueryHelpers.PendingJob retryJob, - TableDestination tableDestination, - BoundedWindow window) { + BigQueryHelpers.PendingJob retryJob, + TableDestination tableDestination, + BoundedWindow window) { this.retryJob = retryJob; this.tableDestination = tableDestination; this.window = window; @@ -95,15 +95,15 @@ public PendingJobData( private final Map pendingJobs = Maps.newHashMap(); public UpdateSchemaDestination( - BigQueryServices bqServices, - PCollectionView zeroLoadJobIdPrefixView, - @Nullable ValueProvider loadJobProjectId, - BigQueryIO.Write.WriteDisposition writeDisposition, - BigQueryIO.Write.CreateDisposition createDisposition, - int maxRetryJobs, - @Nullable String kmsKey, - Set schemaUpdateOptions, - DynamicDestinations dynamicDestinations) { + BigQueryServices bqServices, + PCollectionView zeroLoadJobIdPrefixView, + @Nullable ValueProvider loadJobProjectId, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + int maxRetryJobs, + @Nullable String kmsKey, + Set schemaUpdateOptions, + DynamicDestinations dynamicDestinations) { this.loadJobProjectId = loadJobProjectId; this.zeroLoadJobIdPrefixView = zeroLoadJobIdPrefixView; this.bqServices = bqServices; @@ -123,8 +123,8 @@ public void startBundle(StartBundleContext c) { TableDestination getTableWithDefaultProject(DestinationT destination) { if (dynamicDestinations.getPipelineOptions() == null) { throw new IllegalStateException( - "Unexpected null pipeline option for DynamicDestination object. " - + "Need to call setSideInputAccessorFromProcessContext(context) before use it."); + "Unexpected null pipeline option for DynamicDestination object. " + + "Need to call setSideInputAccessorFromProcessContext(context) before use it."); } BigQueryOptions options = dynamicDestinations.getPipelineOptions().as(BigQueryOptions.class); TableDestination tableDestination = dynamicDestinations.getTable(destination); @@ -132,9 +132,9 @@ TableDestination getTableWithDefaultProject(DestinationT destination) { if (Strings.isNullOrEmpty(tableReference.getProjectId())) { tableReference.setProjectId( - options.getBigQueryProject() == null - ? options.getProject() - : options.getBigQueryProject()); + options.getBigQueryProject() == null + ? options.getProject() + : options.getBigQueryProject()); tableDestination = tableDestination.withTableReference(tableReference); } @@ -143,10 +143,10 @@ TableDestination getTableWithDefaultProject(DestinationT destination) { @ProcessElement public void processElement( - @Element Iterable> element, - ProcessContext context, - BoundedWindow window) - throws IOException { + @Element Iterable> element, + ProcessContext context, + BoundedWindow window) + throws IOException { dynamicDestinations.setSideInputAccessorFromProcessContext(context); List> outputs = Lists.newArrayList(); for (KV entry : element) { @@ -160,33 +160,33 @@ public void processElement( TableSchema schema = dynamicDestinations.getSchema(destination); TableReference tableReference = tableDestination.getTableReference(); String jobIdPrefix = - BigQueryResourceNaming.createJobIdWithDestination( - context.sideInput(zeroLoadJobIdPrefixView), - tableDestination, - 1, - context.pane().getIndex()); + BigQueryResourceNaming.createJobIdWithDestination( + context.sideInput(zeroLoadJobIdPrefixView), + tableDestination, + 1, + context.pane().getIndex()); BigQueryHelpers.PendingJob updateSchemaDestinationJob = - startZeroLoadJob( - getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), - getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - tableReference, - tableDestination.getTimePartitioning(), - tableDestination.getClustering(), - schema, - writeDisposition, - createDisposition, - schemaUpdateOptions); + startZeroLoadJob( + getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + schema, + writeDisposition, + createDisposition, + schemaUpdateOptions); if (updateSchemaDestinationJob != null) { pendingJobs.put( - destination, new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); + destination, new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); } } if (!pendingJobs.isEmpty()) { LOG.info( - "Added {} pending jobs to update the schema for each destination before copying {} temp tables.", - pendingJobs.size(), - outputs.size()); + "Added {} pending jobs to update the schema for each destination before copying {} temp tables.", + pendingJobs.size(), + outputs.size()); } context.output(outputs); } @@ -210,61 +210,61 @@ public void onTeardown() { @FinishBundle public void finishBundle(FinishBundleContext context) throws Exception { DatasetService datasetService = - getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); BigQueryHelpers.PendingJobManager jobManager = new BigQueryHelpers.PendingJobManager(); for (final PendingJobData pendingJobData : pendingJobs.values()) { jobManager = - jobManager.addPendingJob( - pendingJobData.retryJob, - j -> { - try { - if (pendingJobData.tableDestination.getTableDescription() != null) { - TableReference ref = pendingJobData.tableDestination.getTableReference(); - datasetService.patchTableDescription( - ref.clone() - .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), - pendingJobData.tableDestination.getTableDescription()); - } - } catch (IOException | InterruptedException e) { - return e; - } - return null; - }); + jobManager.addPendingJob( + pendingJobData.retryJob, + j -> { + try { + if (pendingJobData.tableDestination.getTableDescription() != null) { + TableReference ref = pendingJobData.tableDestination.getTableReference(); + datasetService.patchTableDescription( + ref.clone() + .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + pendingJobData.tableDestination.getTableDescription()); + } + } catch (IOException | InterruptedException e) { + return e; + } + return null; + }); } jobManager.waitForDone(); } private BigQueryHelpers.PendingJob startZeroLoadJob( - BigQueryServices.JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference tableReference, - TimePartitioning timePartitioning, - Clustering clustering, - @Nullable TableSchema schema, - BigQueryIO.Write.WriteDisposition writeDisposition, - BigQueryIO.Write.CreateDisposition createDisposition, - Set schemaUpdateOptions) { + BigQueryServices.JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference tableReference, + TimePartitioning timePartitioning, + Clustering clustering, + @Nullable TableSchema schema, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + Set schemaUpdateOptions) { JobConfigurationLoad loadConfig = - new JobConfigurationLoad() - .setDestinationTable(tableReference) - .setSchema(schema) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON"); + new JobConfigurationLoad() + .setDestinationTable(tableReference) + .setSchema(schema) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat("NEWLINE_DELIMITED_JSON"); if (schemaUpdateOptions != null) { List options = - schemaUpdateOptions.stream() - .map(BigQueryIO.Write.SchemaUpdateOption::name) - .collect(Collectors.toList()); + schemaUpdateOptions.stream() + .map(BigQueryIO.Write.SchemaUpdateOption::name) + .collect(Collectors.toList()); loadConfig.setSchemaUpdateOptions(options); } if (!loadConfig - .getWriteDisposition() - .equals(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.toString()) - && !loadConfig - .getWriteDisposition() - .equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) { + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.toString()) + && !loadConfig + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) { return null; } final Table destinationTable; @@ -281,9 +281,9 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( // or when destination schema is null (the write will set the schema) // or when provided schema is null (e.g. when using CREATE_NEVER disposition) if (destinationTable.getSchema() == null - || destinationTable.getSchema().isEmpty() - || destinationTable.getSchema().equals(schema) - || schema == null) { + || destinationTable.getSchema().isEmpty() + || destinationTable.getSchema().equals(schema) + || schema == null) { return null; } if (timePartitioning != null) { @@ -296,67 +296,67 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( if (kmsKey != null) { loadConfig.setDestinationEncryptionConfiguration( - new EncryptionConfiguration().setKmsKeyName(kmsKey)); + new EncryptionConfiguration().setKmsKeyName(kmsKey)); } String projectId = - loadJobProjectId == null || loadJobProjectId.get() == null - ? tableReference.getProjectId() - : loadJobProjectId.get(); + loadJobProjectId == null || loadJobProjectId.get() == null + ? tableReference.getProjectId() + : loadJobProjectId.get(); String bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + BigQueryHelpers.getDatasetLocation( + datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); BigQueryHelpers.PendingJob retryJob = - new BigQueryHelpers.PendingJob( - // Function to load the data. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - LOG.info( - "Loading zero rows using job {}, job id {} iteration {}", - tableReference, - jobRef, - jobId.getRetryIndex()); - try { - jobService.startLoadJob( - jobRef, loadConfig, new ByteArrayContent("text/plain", new byte[0])); - } catch (IOException | InterruptedException e) { - LOG.warn("Schema update load job {} failed with {}", jobRef, e.toString()); - throw new RuntimeException(e); - } - return null; - }, - // Function to poll the result of a load job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - // Function to lookup a job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.getJob(jobRef); - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); - } - }, - maxRetryJobs, - jobIdPrefix); + new BigQueryHelpers.PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading zero rows using job {}, job id {} iteration {}", + tableReference, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob( + jobRef, loadConfig, new ByteArrayContent("text/plain", new byte[0])); + } catch (IOException | InterruptedException e) { + LOG.warn("Schema update load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); return retryJob; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index fb42f540e756..288b94ce081b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -94,9 +94,9 @@ * {@link KV} maps the final table to itself. */ class WriteTables - extends PTransform< - PCollection, WritePartition.Result>>, - PCollection>> { + extends PTransform< + PCollection, WritePartition.Result>>, + PCollection>> { @AutoValue abstract static class Result { abstract String getTableName(); @@ -116,7 +116,7 @@ public void encode(Result value, OutputStream outStream) throws CoderException, @Override public Result decode(InputStream inStream) throws CoderException, IOException { return new AutoValue_WriteTables_Result( - StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); + StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); } } @@ -143,7 +143,7 @@ public Result decode(InputStream inStream) throws CoderException, IOException { private final @Nullable String tempDataset; private class WriteTablesDoFn - extends DoFn, WritePartition.Result>, KV> { + extends DoFn, WritePartition.Result>, KV> { private Map jsonSchemas = Maps.newHashMap(); @@ -158,13 +158,13 @@ private class PendingJobData { final boolean isFirstPane; public PendingJobData( - BoundedWindow window, - BigQueryHelpers.PendingJob retryJob, - List partitionFiles, - TableDestination tableDestination, - TableReference tableReference, - DestinationT destinationT, - boolean isFirstPane) { + BoundedWindow window, + BigQueryHelpers.PendingJob retryJob, + List partitionFiles, + TableDestination tableDestination, + TableReference tableReference, + DestinationT destinationT, + boolean isFirstPane) { this.window = window; this.retryJob = retryJob; this.partitionFiles = partitionFiles; @@ -187,10 +187,10 @@ public void startBundle(StartBundleContext c) { @ProcessElement public void processElement( - @Element KV, WritePartition.Result> element, - ProcessContext c, - BoundedWindow window) - throws Exception { + @Element KV, WritePartition.Result> element, + ProcessContext c, + BoundedWindow window) + throws Exception { dynamicDestinations.setSideInputAccessorFromProcessContext(c); DestinationT destination = c.element().getKey().getKey(); TableSchema tableSchema; @@ -199,54 +199,54 @@ public void processElement( } else if (jsonSchemas.containsKey(destination)) { // tableSchema for the destination stored in cache (jsonSchemas) tableSchema = - BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class); + BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class); } else { tableSchema = dynamicDestinations.getSchema(destination); Preconditions.checkArgumentNotNull( - tableSchema, - "Unless create disposition is %s, a schema must be specified, i.e. " - + "DynamicDestinations.getSchema() may not return null. " - + "However, create disposition is %s, and %s returned null for destination %s", - CreateDisposition.CREATE_NEVER, - firstPaneCreateDisposition, - dynamicDestinations, - destination); + tableSchema, + "Unless create disposition is %s, a schema must be specified, i.e. " + + "DynamicDestinations.getSchema() may not return null. " + + "However, create disposition is %s, and %s returned null for destination %s", + CreateDisposition.CREATE_NEVER, + firstPaneCreateDisposition, + dynamicDestinations, + destination); LOG.debug("Fetched TableSchema for table {}:\n\t{}", destination, tableSchema); jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); } TableDestination tableDestination = dynamicDestinations.getTable(destination); checkArgument( - tableDestination != null, - "DynamicDestinations.getTable() may not return null, " - + "but %s returned null for destination %s", - dynamicDestinations, - destination); + tableDestination != null, + "DynamicDestinations.getTable() may not return null, " + + "but %s returned null for destination %s", + dynamicDestinations, + destination); boolean destinationCoderSupportsClustering = - !(dynamicDestinations.getDestinationCoder() instanceof TableDestinationCoderV2); + !(dynamicDestinations.getDestinationCoder() instanceof TableDestinationCoderV2); checkArgument( - tableDestination.getClustering() == null || destinationCoderSupportsClustering, - "DynamicDestinations.getTable() may only return destinations with clustering configured" - + " if a destination coder is supplied that supports clustering, but %s is configured" - + " to use TableDestinationCoderV2. Set withClustering() on BigQueryIO.write() and, " - + " if you provided a custom DynamicDestinations instance, override" - + " getDestinationCoder() to return TableDestinationCoderV3.", - dynamicDestinations); + tableDestination.getClustering() == null || destinationCoderSupportsClustering, + "DynamicDestinations.getTable() may only return destinations with clustering configured" + + " if a destination coder is supplied that supports clustering, but %s is configured" + + " to use TableDestinationCoderV2. Set withClustering() on BigQueryIO.write() and, " + + " if you provided a custom DynamicDestinations instance, override" + + " getDestinationCoder() to return TableDestinationCoderV3.", + dynamicDestinations); TableReference tableReference = tableDestination.getTableReference(); if (Strings.isNullOrEmpty(tableReference.getProjectId())) { BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class); tableReference.setProjectId( - options.getBigQueryProject() == null - ? options.getProject() - : options.getBigQueryProject()); + options.getBigQueryProject() == null + ? options.getProject() + : options.getBigQueryProject()); tableDestination = tableDestination.withTableReference(tableReference); } Integer partition = element.getKey().getShardNumber(); List partitionFiles = Lists.newArrayList(element.getValue().getFilenames()); String jobIdPrefix = - BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); + BigQueryResourceNaming.createJobIdWithDestination( + c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); if (tempTable) { if (tempDataset != null) { @@ -256,10 +256,10 @@ public void processElement( tableReference.setTableId(jobIdPrefix); } else { Lineage.getSinks() - .add( - "bigquery", - BigQueryHelpers.dataCatalogSegments( - tableReference, c.getPipelineOptions().as(BigQueryOptions.class))); + .add( + "bigquery", + BigQueryHelpers.dataCatalogSegments( + tableReference, c.getPipelineOptions().as(BigQueryOptions.class))); } WriteDisposition writeDisposition = firstPaneWriteDisposition; @@ -277,28 +277,28 @@ public void processElement( } BigQueryHelpers.PendingJob retryJob = - startLoad( - getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - tableReference, - tableDestination.getTimePartitioning(), - tableDestination.getClustering(), - tableSchema, - partitionFiles, - writeDisposition, - createDisposition, - schemaUpdateOptions); + startLoad( + getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + tableSchema, + partitionFiles, + writeDisposition, + createDisposition, + schemaUpdateOptions); pendingJobs.add( - new PendingJobData( - window, - retryJob, - partitionFiles, - tableDestination, - tableReference, - destination, - element.getValue().isFirstPane())); + new PendingJobData( + window, + retryJob, + partitionFiles, + tableDestination, + tableReference, + destination, + element.getValue().isFirstPane())); } @Teardown @@ -334,52 +334,52 @@ private JobService getJobService(PipelineOptions pipelineOptions) throws IOExcep @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add( - DisplayData.item("launchesBigQueryJobs", true) - .withLabel("This transform launches BigQuery jobs to read/write elements.")); + DisplayData.item("launchesBigQueryJobs", true) + .withLabel("This transform launches BigQuery jobs to read/write elements.")); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { DatasetService datasetService = - getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); PendingJobManager jobManager = new PendingJobManager(); for (final PendingJobData pendingJob : pendingJobs) { jobManager = - jobManager.addPendingJob( - pendingJob.retryJob, - // Lambda called when the job is done. - j -> { - try { - if (pendingJob.tableDestination.getTableDescription() != null) { - TableReference ref = pendingJob.tableReference; - datasetService.patchTableDescription( - ref.clone() - .setTableId( - BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), - pendingJob.tableDestination.getTableDescription()); - } - Result result = - new AutoValue_WriteTables_Result( - BigQueryHelpers.toJsonString(pendingJob.tableReference), - pendingJob.isFirstPane); - c.output( - mainOutputTag, - KV.of(pendingJob.destinationT, result), - pendingJob.window.maxTimestamp(), - pendingJob.window); - for (String file : pendingJob.partitionFiles) { - c.output( - temporaryFilesTag, - file, - pendingJob.window.maxTimestamp(), - pendingJob.window); - } - return null; - } catch (IOException | InterruptedException e) { - return e; - } - }); + jobManager.addPendingJob( + pendingJob.retryJob, + // Lambda called when the job is done. + j -> { + try { + if (pendingJob.tableDestination.getTableDescription() != null) { + TableReference ref = pendingJob.tableReference; + datasetService.patchTableDescription( + ref.clone() + .setTableId( + BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + pendingJob.tableDestination.getTableDescription()); + } + Result result = + new AutoValue_WriteTables_Result( + BigQueryHelpers.toJsonString(pendingJob.tableReference), + pendingJob.isFirstPane); + c.output( + mainOutputTag, + KV.of(pendingJob.destinationT, result), + pendingJob.window.maxTimestamp(), + pendingJob.window); + for (String file : pendingJob.partitionFiles) { + c.output( + temporaryFilesTag, + file, + pendingJob.window.maxTimestamp(), + pendingJob.window); + } + return null; + } catch (IOException | InterruptedException e) { + return e; + } + }); } jobManager.waitForDone(); } @@ -393,21 +393,21 @@ public void processElement(ProcessContext c) throws Exception { } public WriteTables( - boolean tempTable, - BigQueryServices bqServices, - PCollectionView loadJobIdPrefixView, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - List> sideInputs, - DynamicDestinations dynamicDestinations, - @Nullable ValueProvider loadJobProjectId, - int maxRetryJobs, - boolean ignoreUnknownValues, - @Nullable String kmsKey, - String sourceFormat, - boolean useAvroLogicalTypes, - Set schemaUpdateOptions, - @Nullable String tempDataset) { + boolean tempTable, + BigQueryServices bqServices, + PCollectionView loadJobIdPrefixView, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + List> sideInputs, + DynamicDestinations dynamicDestinations, + @Nullable ValueProvider loadJobProjectId, + int maxRetryJobs, + boolean ignoreUnknownValues, + @Nullable String kmsKey, + String sourceFormat, + boolean useAvroLogicalTypes, + Set schemaUpdateOptions, + @Nullable String tempDataset) { this.tempTable = tempTable; this.bqServices = bqServices; @@ -430,12 +430,12 @@ public WriteTables( @Override public PCollection> expand( - PCollection, WritePartition.Result>> input) { + PCollection, WritePartition.Result>> input) { PCollectionTuple writeTablesOutputs = - input.apply( - ParDo.of(new WriteTablesDoFn()) - .withSideInputs(sideInputs) - .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag))); + input.apply( + ParDo.of(new WriteTablesDoFn()) + .withSideInputs(sideInputs) + .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag))); // Garbage collect temporary files. // We mustn't start garbage collecting files until we are assured that the WriteTablesDoFn has @@ -444,51 +444,51 @@ public PCollection> expand( // to missing files, causing either the entire workflow to fail or get stuck (depending on how // the runner handles persistent failures). writeTablesOutputs - .get(temporaryFilesTag) - .setCoder(StringUtf8Coder.of()) - .apply(WithKeys.of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) - .apply( - Window.>into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) - .discardingFiredPanes()) - .apply(GroupByKey.create()) - .apply(Values.create()) - .apply(ParDo.of(new GarbageCollectTemporaryFiles())); + .get(temporaryFilesTag) + .setCoder(StringUtf8Coder.of()) + .apply(WithKeys.of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) + .apply( + Window.>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()) + .apply(GroupByKey.create()) + .apply(Values.create()) + .apply(ParDo.of(new GarbageCollectTemporaryFiles())); return writeTablesOutputs.get(mainOutputTag); } private PendingJob startLoad( - JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference ref, - @Nullable TimePartitioning timePartitioning, - @Nullable Clustering clustering, - @Nullable TableSchema schema, - List gcsUris, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - Set schemaUpdateOptions) { + JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference ref, + @Nullable TimePartitioning timePartitioning, + @Nullable Clustering clustering, + @Nullable TableSchema schema, + List gcsUris, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + Set schemaUpdateOptions) { @SuppressWarnings({ "nullness" // nulls allowed in most fields but API client not annotated }) JobConfigurationLoad loadConfig = - new JobConfigurationLoad() - .setDestinationTable(ref) - .setSchema(schema) - .setSourceUris(gcsUris) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat(sourceFormat) - .setIgnoreUnknownValues(ignoreUnknownValues) - .setUseAvroLogicalTypes(useAvroLogicalTypes); + new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(gcsUris) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat(sourceFormat) + .setIgnoreUnknownValues(ignoreUnknownValues) + .setUseAvroLogicalTypes(useAvroLogicalTypes); if (schemaUpdateOptions != null) { List options = - schemaUpdateOptions.stream() - .map(Enum::name) - .collect(Collectors.toList()); + schemaUpdateOptions.stream() + .map(Enum::name) + .collect(Collectors.toList()); loadConfig.setSchemaUpdateOptions(options); } if (timePartitioning != null) { @@ -499,66 +499,66 @@ private PendingJob startLoad( } if (kmsKey != null) { loadConfig.setDestinationEncryptionConfiguration( - new EncryptionConfiguration().setKmsKeyName(kmsKey)); + new EncryptionConfiguration().setKmsKeyName(kmsKey)); } String projectId = - loadJobProjectId == null || loadJobProjectId.get() == null - ? ref.getProjectId() - : loadJobProjectId.get(); + loadJobProjectId == null || loadJobProjectId.get() == null + ? ref.getProjectId() + : loadJobProjectId.get(); String bqLocation = - BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); + BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); PendingJob retryJob = - new PendingJob( - // Function to load the data. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - LOG.info( - "Loading {} files into {} using job {}, job id iteration {}", - gcsUris.size(), - ref, - jobRef, - jobId.getRetryIndex()); - try { - jobService.startLoadJob(jobRef, loadConfig); - } catch (IOException | InterruptedException e) { - LOG.warn("Load job {} failed with {}", jobRef, e.toString()); - throw new RuntimeException(e); - } - return null; - }, - // Function to poll the result of a load job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - // Function to lookup a job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.getJob(jobRef); - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); - } - }, - maxRetryJobs, - jobIdPrefix); + new PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading {} files into {} using job {}, job id iteration {}", + gcsUris.size(), + ref, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob(jobRef, loadConfig); + } catch (IOException | InterruptedException e) { + LOG.warn("Load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); return retryJob; } From b65d8c66a1c0e8b528696e615435562514907353 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Wed, 24 Dec 2025 16:01:01 +0530 Subject: [PATCH 6/8] change --- .../beam/io/debezium/DebeziumReadSchemaTransformProvider.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index d85bb1a7dc54..d5f3f98f3b5e 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.Schema; From 0f652bc26856d353ea16cd05ac41551f6c5f5a00 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Thu, 25 Dec 2025 11:52:03 +0530 Subject: [PATCH 7/8] rewriting the logic --- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 15 +++++++++++++++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 8 +++++++- .../gcp/bigquery/UpdateSchemaDestination.java | 11 ++++++++--- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 11 ++++++++--- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 10 ++++++++-- .../io/gcp/bigquery/BigQueryHelpersTest.java | 17 ++++++++++------- 6 files changed, 56 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 36757cf61c8f..96a08a69442a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -637,6 +637,21 @@ static String getDatasetLocation( try { Dataset dataset = datasetService.getDataset(projectId, datasetId); return dataset.getLocation(); + } catch (IOException e) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + if (errorExtractor.itemNotFound(e) + || e instanceof BigQueryServicesImpl.RetryExhaustedException) { + LOG.error( + "Terminal failure obtaining dataset {} in project {}. Resource missing or retries exhausted.", + datasetId, + projectId); + throw new IllegalStateException( + String.format( + "Dataset %s not found or inaccessible in project %s. Please ensure the dataset exists before running the pipeline.", + datasetId, projectId), + e); + } + throw e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw e; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index f4303886c7ab..d6058a0b1847 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -200,6 +200,12 @@ public class BigQueryServicesImpl implements BigQueryServices { private static final Metadata.Key KEY_RETRY_INFO = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); + public static class RetryExhaustedException extends IOException { + public RetryExhaustedException(String message, Throwable cause) { + super(message, cause); + } + } + @Override public JobService getJobService(BigQueryOptions options) { return new JobServiceImpl(options); @@ -1688,7 +1694,7 @@ static T executeWithRetries( LOG.info("Ignore the error and retry the request.", e); } } while (nextBackOff(sleeper, backoff)); - throw new IOException(errorMessage, lastException); + throw new RetryExhaustedException(errorMessage, lastException); } /** Identical to {@link BackOffUtils#next} but without checked IOException. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 65bb3bf11b1b..dbb5fe0eff7f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -302,9 +302,14 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( loadJobProjectId == null || loadJobProjectId.get() == null ? tableReference.getProjectId() : loadJobProjectId.get(); - String bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } BigQueryHelpers.PendingJob retryJob = new BigQueryHelpers.PendingJob( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 061e66024e29..ec05f31e0cc6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -329,9 +329,14 @@ private BigQueryHelpers.PendingJob startCopy( new EncryptionConfiguration().setKmsKeyName(kmsKey)); } - String bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, ref.getProjectId(), ref.getDatasetId()); + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } String projectId = loadJobProjectId == null || loadJobProjectId.get() == null diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 288b94ce081b..b2ce75abbbea 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -505,8 +505,14 @@ private PendingJob startLoad( loadJobProjectId == null || loadJobProjectId.get() == null ? ref.getProjectId() : loadJobProjectId.get(); - String bqLocation = - BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } PendingJob retryJob = new PendingJob( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java index a6df4747c039..7efbb7874c31 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java @@ -17,8 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -285,15 +284,19 @@ public void testGetDatasetLocationWithNonExistentDataset() BigQueryServices.DatasetService mockDatasetService = mock(BigQueryServices.DatasetService.class); - IOException notFoundException = new IOException("Dataset not found"); + BigQueryServicesImpl.RetryExhaustedException retryExhaustedException = + new BigQueryServicesImpl.RetryExhaustedException( + "Retries exhausted", new IOException("cause")); when(mockDatasetService.getDataset("project", "nonexistent_dataset")) - .thenThrow(notFoundException); + .thenThrow(retryExhaustedException); try { BigQueryHelpers.getDatasetLocation(mockDatasetService, "project", "nonexistent_dataset"); - fail("Expected IOException to be thrown"); - } catch (IOException e) { - assertEquals("Dataset not found", e.getMessage()); + fail("Expected IllegalStateException to be thrown"); + } catch (IllegalStateException e) { + assertTrue( + e.getMessage().contains("not found") + || e.getMessage().contains("not found or inaccessible")); // Verify that getDataset was called only once (the IOException is not wrapped and re-thrown) verify(mockDatasetService, times(1)).getDataset("project", "nonexistent_dataset"); } From 008db15500623ed92f841c78647341f13d851ec7 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Thu, 25 Dec 2025 12:18:42 +0530 Subject: [PATCH 8/8] import fixes --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java index 7efbb7874c31..26b2799ba70d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify;