Skip to content

Commit e396726

Browse files
committed
Add Http readTimeout BQ Source/Sink
1 parent 6db2c69 commit e396726

25 files changed

+109
-88
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ private void validateArgumentsColumns(String argumentsColumns, FailureCollector
183183

184184
public QueryJobConfiguration getQueryJobConfiguration(FailureCollector collector) {
185185
Table sourceTable = BigQueryUtil.getBigQueryTable(getDatasetProject(), dataset, table, getServiceAccount(),
186-
isServiceAccountFilePath(), collector);
186+
isServiceAccountFilePath(), collector, null);
187187

188188
if (sourceTable == null) {
189189
// Table does not exist

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import io.cdap.cdap.api.annotation.Name;
4646
import io.cdap.cdap.api.annotation.Plugin;
4747
import io.cdap.cdap.api.exception.ErrorCategory;
48-
import io.cdap.cdap.api.exception.ErrorCodeType;
4948
import io.cdap.cdap.api.exception.ErrorType;
5049
import io.cdap.cdap.api.exception.ErrorUtils;
5150
import io.cdap.cdap.etl.api.FailureCollector;
@@ -364,7 +363,6 @@ public static final class Config extends AbstractBigQueryActionConfig {
364363
public static final int DEFAULT_MAX_RETRY_COUNT = 5;
365364
// Sn = a * (1 - r^n) / (r - 1)
366365
public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L;
367-
public static final int DEFAULT_READ_TIMEOUT = 120;
368366
public static final Set<String> VALID_WRITE_PREFERENCES = Arrays.stream(JobInfo.WriteDisposition.values())
369367
.map(Enum::name).collect(Collectors.toSet());
370368

@@ -581,7 +579,7 @@ public int getMaxRetryCount() {
581579
}
582580

583581
public int getReadTimeout() {
584-
return readTimeout == null ? DEFAULT_READ_TIMEOUT : readTimeout;
582+
return readTimeout == null ? GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS : readTimeout;
585583
}
586584

587585
@Override

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
3434
import io.cdap.plugin.gcp.common.CmekUtils;
3535

36+
import io.cdap.plugin.gcp.common.GCPUtils;
3637
import java.util.Collections;
3738
import java.util.HashSet;
3839
import java.util.Map;
@@ -54,6 +55,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
5455
protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation";
5556
private static final String SCHEME = "gs://";
5657
protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields";
58+
private static final String NAME_READ_TIMEOUT = "readTimeout";
5759

5860
@Name(Constants.Reference.REFERENCE_NAME)
5961
@Nullable
@@ -100,6 +102,16 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
100102
"The schema of these fields should be of type STRING.")
101103
protected String jsonStringFields;
102104

105+
@Name(NAME_READ_TIMEOUT)
106+
@Nullable
107+
@Macro
108+
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
109+
private Integer readTimeout;
110+
111+
public int getReadTimeout() {
112+
return readTimeout == null ? GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS : readTimeout;
113+
}
114+
103115
public AbstractBigQuerySinkConfig(BigQueryConnectorConfig connection, String dataset, String cmekKey, String bucket) {
104116
super(connection, dataset, cmekKey, bucket);
105117
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ protected void configureOutputSchemas(BatchSinkContext context,
128128

129129
Table table = BigQueryUtil.getBigQueryTable(
130130
config.getDatasetProject(), config.getDataset(), tableName, config.getServiceAccount(),
131-
config.isServiceAccountFilePath(), collector);
131+
config.isServiceAccountFilePath(), collector, config.getReadTimeout());
132132

133133
Schema tableSchema = configuredSchema;
134134
if (table != null) {

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSinkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.cdap.cdap.api.annotation.Macro;
2121
import io.cdap.cdap.api.annotation.Name;
2222
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
23+
import io.cdap.plugin.gcp.common.GCPUtils;
2324

2425
import javax.annotation.Nullable;
2526

@@ -30,6 +31,7 @@ public class BigQueryMultiSinkConfig extends AbstractBigQuerySinkConfig {
3031

3132
private static final String SPLIT_FIELD_DEFAULT = "tablename";
3233
private static final String NAME_ALLOW_FLEXIBLE_SCHEMA = "allowFlexibleSchema";
34+
private static final String NAME_READ_TIMEOUT = "readTimeout";
3335

3436
@Macro
3537
@Nullable

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ protected void prepareRunInternal(BatchSinkContext context, BigQuery bigQuery, S
138138
configureBigQuerySink();
139139
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), config.getTable(),
140140
config.getServiceAccount(), config.isServiceAccountFilePath(),
141-
collector);
141+
collector, config.getReadTimeout());
142142
initOutput(context, bigQuery, config.getReferenceName(),
143143
BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), config.getTable()),
144144
config.getTable(), outputSchema, bucket, collector, null, table);
@@ -343,10 +343,8 @@ private void configureBigQuerySink() {
343343
*/
344344
private void configureTable(Schema schema) {
345345
AbstractBigQuerySinkConfig config = getConfig();
346-
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(),
347-
config.getTable(),
348-
config.getServiceAccount(),
349-
config.isServiceAccountFilePath());
346+
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), config.getTable(),
347+
config.getServiceAccount(), config.isServiceAccountFilePath(), null, config.getReadTimeout());
350348
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_DESTINATION_TABLE_EXISTS, table != null);
351349
List<String> tableFieldsNames = null;
352350
if (table != null) {
@@ -372,7 +370,7 @@ private void validateConfiguredSchema(Schema schema, FailureCollector collector)
372370
String tableName = config.getTable();
373371
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), tableName,
374372
config.getServiceAccount(), config.isServiceAccountFilePath(),
375-
collector);
373+
collector, config.getReadTimeout());
376374

377375
if (table != null && !config.containsMacro(AbstractBigQuerySinkConfig.NAME_UPDATE_SCHEMA)) {
378376
// if table already exists, validate schema against underlying bigquery table

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.cloud.kms.v1.CryptoKeyName;
2525
import com.google.common.annotations.VisibleForTesting;
2626
import com.google.common.base.Strings;
27-
import com.google.common.collect.ImmutableMap;
2827
import com.google.common.collect.ImmutableSet;
2928
import io.cdap.cdap.api.annotation.Description;
3029
import io.cdap.cdap.api.annotation.Macro;
@@ -41,7 +40,6 @@
4140

4241
import java.io.IOException;
4342
import java.util.Arrays;
44-
import java.util.HashSet;
4543
import java.util.List;
4644
import java.util.Map;
4745
import java.util.Objects;
@@ -419,7 +417,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect
419417
}
420418

421419
Table table = BigQueryUtil.getBigQueryTable(project, dataset, tableName, serviceAccount,
422-
isServiceAccountFilePath(), collector);
420+
isServiceAccountFilePath(), collector, getReadTimeout());
423421
if (table != null) {
424422
StandardTableDefinition tableDefinition = table.getDefinition();
425423
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import io.cdap.cdap.api.data.schema.Schema;
4141
import io.cdap.cdap.api.dataset.lib.KeyValue;
4242
import io.cdap.cdap.api.exception.ErrorType;
43-
import io.cdap.cdap.api.exception.ProgramFailureException;
4443
import io.cdap.cdap.etl.api.Emitter;
4544
import io.cdap.cdap.etl.api.FailureCollector;
4645
import io.cdap.cdap.etl.api.PipelineConfigurer;
@@ -50,7 +49,6 @@
5049
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
5150
import io.cdap.cdap.etl.api.connector.Connector;
5251
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
53-
import io.cdap.cdap.etl.api.exception.ErrorContext;
5452
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
5553
import io.cdap.cdap.etl.api.validation.ValidationFailure;
5654
import io.cdap.plugin.common.Asset;
@@ -262,6 +260,7 @@ private void configureBigQuerySource() {
262260
if (config.getViewMaterializationDataset() != null) {
263261
configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset());
264262
}
263+
configuration.set(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT, String.valueOf(config.getReadTimeout()));
265264
}
266265

267266
public Schema getSchema(FailureCollector collector) {
@@ -310,7 +309,7 @@ private com.google.cloud.bigquery.Schema getBQSchema(FailureCollector collector)
310309
String project = config.getDatasetProject();
311310

312311
Table table = BigQueryUtil.getBigQueryTable(project, dataset, tableName, serviceAccount,
313-
config.isServiceAccountFilePath(), collector);
312+
config.isServiceAccountFilePath(), collector, config.getReadTimeout());
314313
if (table == null) {
315314
// Table does not exist
316315
collector.addFailure(String.format("BigQuery table '%s:%s.%s' does not exist.", project, dataset, tableName),
@@ -343,7 +342,7 @@ private void validatePartitionProperties(FailureCollector collector) {
343342
String dataset = config.getDataset();
344343
String tableName = config.getTable();
345344
Table sourceTable = BigQueryUtil.getBigQueryTable(project, dataset, tableName, config.getServiceAccount(),
346-
config.isServiceAccountFilePath(), collector);
345+
config.isServiceAccountFilePath(), collector, config.getReadTimeout());
347346
if (sourceTable == null) {
348347
return;
349348
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
5858
private static final String VALID_DATE_FORMAT = "yyyy-MM-dd";
5959
private static final String SCHEME = "gs://";
6060
private static final String WHERE = "WHERE";
61+
private static final String NAME_READ_TIMEOUT = "readTimeout";
6162
public static final Set<Schema.Type> SUPPORTED_TYPES =
6263
ImmutableSet.of(Schema.Type.LONG, Schema.Type.STRING, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES,
6364
Schema.Type.ARRAY, Schema.Type.RECORD);
@@ -131,6 +132,12 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
131132
+ "Defaults to the same dataset in which the table is located.")
132133
private String viewMaterializationDataset;
133134

135+
@Name(NAME_READ_TIMEOUT)
136+
@Nullable
137+
@Macro
138+
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
139+
private Integer readTimeout;
140+
134141
public String getTable() {
135142
return table;
136143
}
@@ -142,6 +149,10 @@ public String getDatasetProject() {
142149
return connection.getDatasetProject();
143150
}
144151

152+
public int getReadTimeout() {
153+
return readTimeout == null ? GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS : readTimeout;
154+
}
155+
145156
public void validate(FailureCollector collector) {
146157
validate(collector, Collections.emptyMap());
147158
}
@@ -238,7 +249,7 @@ private void validatePartitionDate(FailureCollector collector, String partitionD
238249
public Type getSourceTableType() {
239250
Table sourceTable =
240251
BigQueryUtil.getBigQueryTable(getDatasetProject(), getDataset(), table, getServiceAccount(),
241-
isServiceAccountFilePath());
252+
isServiceAccountFilePath(), null, getReadTimeout());
242253
return sourceTable != null ? sourceTable.getDefinition().getType() : null;
243254
}
244255

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,11 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
132132
String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null);
133133
String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null);
134134
String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null);
135+
Integer readTimeout = configuration.getInt(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT,
136+
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
135137

136138
com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable(
137-
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath);
139+
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null, readTimeout);
138140
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();
139141

140142
String query;
@@ -173,7 +175,8 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
173175
String queryTemplate = "select * from `%s` where %s";
174176
com.google.cloud.bigquery.Table sourceTable = BigQueryUtil.getBigQueryTable(datasetProject, dataset, table,
175177
serviceAccount,
176-
isServiceAccountFilePath);
178+
isServiceAccountFilePath,
179+
null, null);
177180
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
178181
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
179182
if (timePartitioning == null && filter == null) {

0 commit comments

Comments
 (0)