Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.FailureCollector;
Expand Down Expand Up @@ -364,7 +363,6 @@ public static final class Config extends AbstractBigQueryActionConfig {
public static final int DEFAULT_MAX_RETRY_COUNT = 5;
// Sn = a * (1 - r^n) / (r - 1)
public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L;
public static final int DEFAULT_READ_TIMEOUT = 120;
public static final Set<String> VALID_WRITE_PREFERENCES = Arrays.stream(JobInfo.WriteDisposition.values())
.map(Enum::name).collect(Collectors.toSet());

Expand Down Expand Up @@ -581,7 +579,7 @@ public int getMaxRetryCount() {
}

public int getReadTimeout() {
return readTimeout == null ? DEFAULT_READ_TIMEOUT : readTimeout;
return readTimeout == null ? GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS : readTimeout;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset());
Dataset dataset;
try {
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
bigQuery = GCPUtils.getBigQuery(project, credentials, config.getReadTimeout());
dataset = bigQuery.getDataset(datasetId);
} catch (Exception e) {
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;

import io.cdap.plugin.gcp.common.GCPUtils;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -54,6 +55,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation";
private static final String SCHEME = "gs://";
protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields";
private static final String NAME_READ_TIMEOUT = "readTimeout";

@Name(Constants.Reference.REFERENCE_NAME)
@Nullable
Expand Down Expand Up @@ -100,6 +102,16 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
"The schema of these fields should be of type STRING.")
protected String jsonStringFields;

@Name(NAME_READ_TIMEOUT)
@Nullable
@Macro
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
private Integer readTimeout;

public int getReadTimeout() {
return readTimeout == null ? GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS : readTimeout;
}

public AbstractBigQuerySinkConfig(BigQueryConnectorConfig connection, String dataset, String cmekKey, String bucket) {
super(connection, dataset, cmekKey, bucket);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ protected void configureOutputSchemas(BatchSinkContext context,

Table table = BigQueryUtil.getBigQueryTable(
config.getDatasetProject(), config.getDataset(), tableName, config.getServiceAccount(),
config.isServiceAccountFilePath(), collector, null);
config.isServiceAccountFilePath(), collector, config.getReadTimeout());

Schema tableSchema = configuredSchema;
if (table != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
import io.cdap.plugin.gcp.common.GCPUtils;

import javax.annotation.Nullable;

Expand All @@ -30,6 +31,7 @@ public class BigQueryMultiSinkConfig extends AbstractBigQuerySinkConfig {

private static final String SPLIT_FIELD_DEFAULT = "tablename";
private static final String NAME_ALLOW_FLEXIBLE_SCHEMA = "allowFlexibleSchema";
private static final String NAME_READ_TIMEOUT = "readTimeout";

@Macro
@Nullable
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ protected void prepareRunInternal(BatchSinkContext context, BigQuery bigQuery, S
configureBigQuerySink();
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), config.getTable(),
config.getServiceAccount(), config.isServiceAccountFilePath(),
collector, null);
collector, config.getReadTimeout());
initOutput(context, bigQuery, config.getReferenceName(),
BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), config.getTable()),
config.getTable(), outputSchema, bucket, collector, null, table);
Expand Down Expand Up @@ -343,10 +343,8 @@ private void configureBigQuerySink() {
*/
private void configureTable(Schema schema) {
AbstractBigQuerySinkConfig config = getConfig();
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(),
config.getTable(),
config.getServiceAccount(),
config.isServiceAccountFilePath(), null, null);
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), config.getTable(),
config.getServiceAccount(), config.isServiceAccountFilePath(), null, config.getReadTimeout());
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_DESTINATION_TABLE_EXISTS, table != null);
List<String> tableFieldsNames = null;
if (table != null) {
Expand All @@ -372,7 +370,7 @@ private void validateConfiguredSchema(Schema schema, FailureCollector collector)
String tableName = config.getTable();
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), tableName,
config.getServiceAccount(), config.isServiceAccountFilePath(),
collector, null);
collector, config.getReadTimeout());

if (table != null && !config.containsMacro(AbstractBigQuerySinkConfig.NAME_UPDATE_SCHEMA)) {
// if table already exists, validate schema against underlying bigquery table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect
}

Table table = BigQueryUtil.getBigQueryTable(project, dataset, tableName, serviceAccount,
isServiceAccountFilePath(), collector, null);
isServiceAccountFilePath(), collector, getReadTimeout());
if (table != null) {
StandardTableDefinition tableDefinition = table.getDefinition();
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private void configureBigQuerySource() {
if (config.getViewMaterializationDataset() != null) {
configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset());
}
configuration.set(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT, String.valueOf(config.getReadTimeout()));
}

public Schema getSchema(FailureCollector collector) {
Expand Down Expand Up @@ -308,7 +309,7 @@ private com.google.cloud.bigquery.Schema getBQSchema(FailureCollector collector)
String project = config.getDatasetProject();

Table table = BigQueryUtil.getBigQueryTable(project, dataset, tableName, serviceAccount,
config.isServiceAccountFilePath(), collector, null);
config.isServiceAccountFilePath(), collector, config.getReadTimeout());
if (table == null) {
// Table does not exist
collector.addFailure(String.format("BigQuery table '%s:%s.%s' does not exist.", project, dataset, tableName),
Expand Down Expand Up @@ -341,7 +342,7 @@ private void validatePartitionProperties(FailureCollector collector) {
String dataset = config.getDataset();
String tableName = config.getTable();
Table sourceTable = BigQueryUtil.getBigQueryTable(project, dataset, tableName, config.getServiceAccount(),
config.isServiceAccountFilePath(), collector, null);
config.isServiceAccountFilePath(), collector, config.getReadTimeout());
if (sourceTable == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
private static final String VALID_DATE_FORMAT = "yyyy-MM-dd";
private static final String SCHEME = "gs://";
private static final String WHERE = "WHERE";
private static final String NAME_READ_TIMEOUT = "readTimeout";
public static final Set<Schema.Type> SUPPORTED_TYPES =
ImmutableSet.of(Schema.Type.LONG, Schema.Type.STRING, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES,
Schema.Type.ARRAY, Schema.Type.RECORD);
Expand Down Expand Up @@ -131,6 +132,12 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
+ "Defaults to the same dataset in which the table is located.")
private String viewMaterializationDataset;

@Name(NAME_READ_TIMEOUT)
@Nullable
@Macro
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
private Integer readTimeout;

public String getTable() {
return table;
}
Expand All @@ -142,6 +149,10 @@ public String getDatasetProject() {
return connection.getDatasetProject();
}

public int getReadTimeout() {
return readTimeout == null ? GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS : readTimeout;
}

public void validate(FailureCollector collector) {
validate(collector, Collections.emptyMap());
}
Expand Down Expand Up @@ -238,7 +249,7 @@ private void validatePartitionDate(FailureCollector collector, String partitionD
public Type getSourceTableType() {
Table sourceTable =
BigQueryUtil.getBigQueryTable(getDatasetProject(), getDataset(), table, getServiceAccount(),
isServiceAccountFilePath(), null, null);
isServiceAccountFilePath(), null, getReadTimeout());
return sourceTable != null ? sourceTable.getDefinition().getType() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null);
String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null);
String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null);
Integer readTimeout = configuration.getInt(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT,
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);

com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable(
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null, null);
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null, readTimeout);
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();

String query;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ public interface BigQueryConstants {
String CDAP_BQ_SINK_OUTPUT_SCHEMA = "cdap.bq.sink.output.schema";
String BQ_FQN_PREFIX = "bigquery";
String CONFIG_JOB_LABEL_KEY_VALUE = "cdap.bq.sink.job.label.key.value";
String CONFIG_BQ_HTTP_READ_TIMEOUT = "cdap.bq.job.http.read.timeout";
}
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ public static Table getBigQueryTable(String projectId, String dataset, String ta
}
BigQuery bigQuery = GCPUtils.getBigQuery(projectId, credentials, readTimeoutSeconds);

Table table = null;
Table table;
try {
table = bigQuery.getTable(tableId);
} catch (BigQueryException e) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.gson.reflect.TypeToken;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.gcs.GCSPath;
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

import java.io.ByteArrayInputStream;
Expand All @@ -62,6 +65,8 @@
* GCP utility class to get service account credentials
*/
public class GCPUtils {

private static final Logger LOG = LoggerFactory.getLogger(GCPUtils.class);
public static final String FS_GS_PROJECT_ID = "fs.gs.project.id";
private static final Gson GSON = new Gson();
private static final Type SCOPES_TYPE = new TypeToken<List<String>>() { }.getType();
Expand All @@ -82,6 +87,7 @@ public class GCPUtils {
public static final String GCS_SUPPORTED_DOC_URL = "https://cloud.google.com/storage/docs/json_api/v1/status-codes";
public static final String BQ_SUPPORTED_DOC_URL = "https://cloud.google.com/bigquery/docs/error-messages";
public static final String PUBSUB_SUPPORTED_DOC_URL = "https://cloud.google.com/pubsub/docs/reference/error-codes";
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;

/**
* Load a service account from the local file system.
Expand Down Expand Up @@ -259,6 +265,7 @@ public static BigQuery getBigQuery(String project, @Nullable Credentials credent
}

if (readTimeout != null) {
LOG.debug("Setting read timeout to {} seconds.", readTimeout);
bigqueryBuilder.setTransportOptions(HttpTransportOptions.newBuilder()
.setReadTimeout(readTimeout * MILLISECONDS_MULTIPLIER).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.cdap.cdap.etl.api.action.ActionContext;

import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void testValidateRetryConfigurationWithDefaultValues() {
BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS,
BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT,
BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER,
BigQueryExecute.Config.DEFAULT_READ_TIMEOUT);
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
Assert.assertEquals(0, failureCollector.getValidationFailures().size());
}

Expand All @@ -144,7 +145,7 @@ public void testValidateRetryConfigurationWithInvalidInitialRetryDuration() {
BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS,
BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT,
BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER,
BigQueryExecute.Config.DEFAULT_READ_TIMEOUT);
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals("Initial retry duration must be greater than 0.",
failureCollector.getValidationFailures().get(0).getMessage());
Expand All @@ -156,7 +157,7 @@ public void testValidateRetryConfigurationWithInvalidMaxRetryDuration() {
BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, -1L,
BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT,
BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER,
BigQueryExecute.Config.DEFAULT_READ_TIMEOUT);
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
Assert.assertEquals(2, failureCollector.getValidationFailures().size());
Assert.assertEquals("Max retry duration must be greater than 0.",
failureCollector.getValidationFailures().get(0).getMessage());
Expand All @@ -170,7 +171,7 @@ public void testValidateRetryConfigurationWithInvalidRetryMultiplier() {
BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS,
BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS,
BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, -1.0,
BigQueryExecute.Config.DEFAULT_READ_TIMEOUT);
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals("Retry multiplier must be strictly greater than 1.",
failureCollector.getValidationFailures().get(0).getMessage());
Expand All @@ -182,7 +183,7 @@ public void testValidateRetryConfigurationWithInvalidRetryMultiplierAndMaxRetryC
BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS,
BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, -1,
BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER,
BigQueryExecute.Config.DEFAULT_READ_TIMEOUT);
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals("Max retry count must be greater than 0.",
failureCollector.getValidationFailures().get(0).getMessage());
Expand All @@ -194,7 +195,7 @@ public void testValidateRetryConfigurationWithMultiplierOne() {
BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS,
BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS,
BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, 1.0,
BigQueryExecute.Config.DEFAULT_READ_TIMEOUT);
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals("Retry multiplier must be strictly greater than 1.",
failureCollector.getValidationFailures().get(0).getMessage());
Expand All @@ -205,7 +206,7 @@ public void testValidateRetryConfigurationWithMaxRetryLessThanInitialRetry() {
config.validateRetryConfiguration(failureCollector, 10L, 5L,
BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT,
BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER,
BigQueryExecute.Config.DEFAULT_READ_TIMEOUT);
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals("Max retry duration must be greater than initial retry duration.",
failureCollector.getValidationFailures().get(0).getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public void testGenerateQuery() {
StandardTableDefinition tableDefinition = PowerMockito.mock(StandardTableDefinition.class);
PowerMockito.when(BigQueryUtil.getBigQueryTable(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(),
ArgumentMatchers.anyString(), ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(),
ArgumentMatchers.any()))
.thenReturn(t);
ArgumentMatchers.any())).thenReturn(t);
PowerMockito.when(t.getDefinition()).thenReturn(tableDefinition);
String generatedQuery = partitionedBigQueryInputFormat.generateQuery(null, null, filter, datasetProject,
datasetProject, dataset, table, null, true);
Expand Down
9 changes: 9 additions & 0 deletions widgets/BigQueryMultiTable-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@
}
]
}
},
{
"widget-type": "hidden",
"label": "Read Timeout",
"name": "readTimeout",
"widget-attributes": {
"default": "120",
"minimum": "0"
}
}
]
},
Expand Down
9 changes: 9 additions & 0 deletions widgets/BigQueryTable-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@
},
"default": "false"
}
},
{
"widget-type": "hidden",
"label": "Read Timeout",
"name": "readTimeout",
"widget-attributes": {
"default": "120",
"minimum": "0"
}
}
]
},
Expand Down
9 changes: 9 additions & 0 deletions widgets/BigQueryTable-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@
"widget-attributes": {
"placeholder": "projects/<gcp-project-id>/locations/<key-location>/keyRings/<key-ring-name>/cryptoKeys/<key-name>"
}
},
{
"widget-type": "hidden",
"label": "Read Timeout",
"name": "readTimeout",
"widget-attributes": {
"default": "120",
"minimum": "0"
}
}
]
},
Expand Down
Loading