Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -632,20 +632,30 @@ static void verifyDatasetPresence(DatasetService datasetService, TableReference
}

static String getDatasetLocation(
DatasetService datasetService, String projectId, String datasetId) {
Dataset dataset;
DatasetService datasetService, String projectId, String datasetId)
throws IOException, InterruptedException {
try {
dataset = datasetService.getDataset(projectId, datasetId);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Dataset dataset = datasetService.getDataset(projectId, datasetId);
return dataset.getLocation();
} catch (IOException e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if (errorExtractor.itemNotFound(e)
|| e instanceof BigQueryServicesImpl.RetryExhaustedException) {
LOG.error(
"Terminal failure obtaining dataset {} in project {}. Resource missing or retries exhausted.",
datasetId,
projectId);
throw new IllegalStateException(
String.format(
"Dataset %s not found or inaccessible in project %s. Please ensure the dataset exists before running the pipeline.",
datasetId, projectId),
e);
}
throw new RuntimeException(
String.format(
"unable to obtain dataset for dataset %s in project %s", datasetId, projectId),
e);
throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
return dataset.getLocation();
}

static void verifyTablePresence(DatasetService datasetService, TableReference table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ public class BigQueryServicesImpl implements BigQueryServices {
private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

public static class RetryExhaustedException extends IOException {
public RetryExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}

@Override
public JobService getJobService(BigQueryOptions options) {
return new JobServiceImpl(options);
Expand Down Expand Up @@ -1688,7 +1694,7 @@ static <T> T executeWithRetries(
LOG.info("Ignore the error and retry the request.", e);
}
} while (nextBackOff(sleeper, backoff));
throw new IOException(errorMessage, lastException);
throw new RetryExhaustedException(errorMessage, lastException);
}

/** Identical to {@link BackOffUtils#next} but without checked IOException. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,14 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
loadJobProjectId == null || loadJobProjectId.get() == null
? tableReference.getProjectId()
: loadJobProjectId.get();
String bqLocation =
BigQueryHelpers.getDatasetLocation(
datasetService, tableReference.getProjectId(), tableReference.getDatasetId());
String bqLocation;
try {
bqLocation =
BigQueryHelpers.getDatasetLocation(
datasetService, tableReference.getProjectId(), tableReference.getDatasetId());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}

BigQueryHelpers.PendingJob retryJob =
new BigQueryHelpers.PendingJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,14 @@ private BigQueryHelpers.PendingJob startCopy(
new EncryptionConfiguration().setKmsKeyName(kmsKey));
}

String bqLocation =
BigQueryHelpers.getDatasetLocation(
datasetService, ref.getProjectId(), ref.getDatasetId());
String bqLocation;
try {
bqLocation =
BigQueryHelpers.getDatasetLocation(
datasetService, ref.getProjectId(), ref.getDatasetId());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}

String projectId =
loadJobProjectId == null || loadJobProjectId.get() == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,14 @@ private PendingJob startLoad(
loadJobProjectId == null || loadJobProjectId.get() == null
? ref.getProjectId()
: loadJobProjectId.get();
String bqLocation =
BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId());
String bqLocation;
try {
bqLocation =
BigQueryHelpers.getDatasetLocation(
datasetService, ref.getProjectId(), ref.getDatasetId());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}

PendingJob retryJob =
new PendingJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
Expand Down Expand Up @@ -271,4 +279,43 @@ public void testClusteringJsonConversion() {

assertEquals(clustering, BigQueryHelpers.clusteringFromJsonFields(jsonClusteringFields));
}

@Test
public void testGetDatasetLocationWithNonExistentDataset()
throws IOException, InterruptedException {
BigQueryServices.DatasetService mockDatasetService =
mock(BigQueryServices.DatasetService.class);

BigQueryServicesImpl.RetryExhaustedException retryExhaustedException =
new BigQueryServicesImpl.RetryExhaustedException(
"Retries exhausted", new IOException("cause"));
when(mockDatasetService.getDataset("project", "nonexistent_dataset"))
.thenThrow(retryExhaustedException);

try {
BigQueryHelpers.getDatasetLocation(mockDatasetService, "project", "nonexistent_dataset");
fail("Expected IllegalStateException to be thrown");
} catch (IllegalStateException e) {
assertTrue(
e.getMessage().contains("not found")
|| e.getMessage().contains("not found or inaccessible"));
// Verify that getDataset was called only once (the IOException is not wrapped and re-thrown)
verify(mockDatasetService, times(1)).getDataset("project", "nonexistent_dataset");
}
}

@Test
public void testGetDatasetLocationWithValidDataset() throws IOException, InterruptedException {
BigQueryServices.DatasetService mockDatasetService =
mock(BigQueryServices.DatasetService.class);
Dataset mockDataset = new Dataset().setLocation("US");

when(mockDatasetService.getDataset("project", "existing_dataset")).thenReturn(mockDataset);

String location =
BigQueryHelpers.getDatasetLocation(mockDatasetService, "project", "existing_dataset");

assertEquals("US", location);
verify(mockDatasetService, times(1)).getDataset("project", "existing_dataset");
}
}
Loading