diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 129c8314fc80..96a08a69442a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -632,20 +632,30 @@ static void verifyDatasetPresence(DatasetService datasetService, TableReference } static String getDatasetLocation( - DatasetService datasetService, String projectId, String datasetId) { - Dataset dataset; + DatasetService datasetService, String projectId, String datasetId) + throws IOException, InterruptedException { try { - dataset = datasetService.getDataset(projectId, datasetId); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + Dataset dataset = datasetService.getDataset(projectId, datasetId); + return dataset.getLocation(); + } catch (IOException e) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + if (errorExtractor.itemNotFound(e) + || e instanceof BigQueryServicesImpl.RetryExhaustedException) { + LOG.error( + "Terminal failure obtaining dataset {} in project {}. Resource missing or retries exhausted.", + datasetId, + projectId); + throw new IllegalStateException( + String.format( + "Dataset %s not found or inaccessible in project %s. Please ensure the dataset exists before running the pipeline.", + datasetId, projectId), + e); } - throw new RuntimeException( - String.format( - "unable to obtain dataset for dataset %s in project %s", datasetId, projectId), - e); + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; } - return dataset.getLocation(); } static void verifyTablePresence(DatasetService datasetService, TableReference table) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index f4303886c7ab..d6058a0b1847 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -200,6 +200,12 @@ public class BigQueryServicesImpl implements BigQueryServices { private static final Metadata.Key KEY_RETRY_INFO = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); + public static class RetryExhaustedException extends IOException { + public RetryExhaustedException(String message, Throwable cause) { + super(message, cause); + } + } + @Override public JobService getJobService(BigQueryOptions options) { return new JobServiceImpl(options); @@ -1688,7 +1694,7 @@ static T executeWithRetries( LOG.info("Ignore the error and retry the request.", e); } } while (nextBackOff(sleeper, backoff)); - throw new IOException(errorMessage, lastException); + throw new RetryExhaustedException(errorMessage, lastException); } /** Identical to {@link BackOffUtils#next} but without checked IOException. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 65bb3bf11b1b..dbb5fe0eff7f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -302,9 +302,14 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( loadJobProjectId == null || loadJobProjectId.get() == null ? tableReference.getProjectId() : loadJobProjectId.get(); - String bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } BigQueryHelpers.PendingJob retryJob = new BigQueryHelpers.PendingJob( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 061e66024e29..ec05f31e0cc6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -329,9 +329,14 @@ private BigQueryHelpers.PendingJob startCopy( new EncryptionConfiguration().setKmsKeyName(kmsKey)); } - String bqLocation = - BigQueryHelpers.getDatasetLocation( - datasetService, ref.getProjectId(), ref.getDatasetId()); + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } String projectId = loadJobProjectId == null || loadJobProjectId.get() == null diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 288b94ce081b..b2ce75abbbea 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -505,8 +505,14 @@ private PendingJob startLoad( loadJobProjectId == null || loadJobProjectId.get() == null ? ref.getProjectId() : loadJobProjectId.get(); - String bqLocation = - BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); + String bqLocation; + try { + bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } PendingJob retryJob = new PendingJob( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java index 0d36d7bb46d0..26b2799ba70d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java @@ -18,15 +18,23 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.Clustering; +import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; import java.util.Arrays; import java.util.Optional; import java.util.Random; @@ -271,4 +279,43 @@ public void testClusteringJsonConversion() { assertEquals(clustering, BigQueryHelpers.clusteringFromJsonFields(jsonClusteringFields)); } + + @Test + public void testGetDatasetLocationWithNonExistentDataset() + throws IOException, InterruptedException { + BigQueryServices.DatasetService mockDatasetService = + mock(BigQueryServices.DatasetService.class); + + BigQueryServicesImpl.RetryExhaustedException retryExhaustedException = + new BigQueryServicesImpl.RetryExhaustedException( + "Retries exhausted", new IOException("cause")); + when(mockDatasetService.getDataset("project", "nonexistent_dataset")) + .thenThrow(retryExhaustedException); + + try { + BigQueryHelpers.getDatasetLocation(mockDatasetService, "project", "nonexistent_dataset"); + fail("Expected IllegalStateException to be thrown"); + } catch (IllegalStateException e) { + assertTrue( + e.getMessage().contains("not found") + || e.getMessage().contains("not found or inaccessible")); + // Verify that getDataset was called only once (the IOException is not wrapped and re-thrown) + verify(mockDatasetService, times(1)).getDataset("project", "nonexistent_dataset"); + } + } + + @Test + public void testGetDatasetLocationWithValidDataset() throws IOException, InterruptedException { + BigQueryServices.DatasetService mockDatasetService = + mock(BigQueryServices.DatasetService.class); + Dataset mockDataset = new Dataset().setLocation("US"); + + when(mockDatasetService.getDataset("project", "existing_dataset")).thenReturn(mockDataset); + + String location = + BigQueryHelpers.getDatasetLocation(mockDatasetService, "project", "existing_dataset"); + + assertEquals("US", location); + verify(mockDatasetService, times(1)).getDataset("project", "existing_dataset"); + } }