Skip to content

Commit 8ec7517

Browse files
committed
Code Cleanup BigQueryUtil getBigQueryTable
1 parent 6db2c69 commit 8ec7517

16 files changed

+49
-99
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ private void validateArgumentsColumns(String argumentsColumns, FailureCollector
183183

184184
public QueryJobConfiguration getQueryJobConfiguration(FailureCollector collector) {
185185
Table sourceTable = BigQueryUtil.getBigQueryTable(getDatasetProject(), dataset, table, getServiceAccount(),
186-
isServiceAccountFilePath(), collector);
186+
isServiceAccountFilePath(), collector, null);
187187

188188
if (sourceTable == null) {
189189
// Table does not exist

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ protected void configureOutputSchemas(BatchSinkContext context,
128128

129129
Table table = BigQueryUtil.getBigQueryTable(
130130
config.getDatasetProject(), config.getDataset(), tableName, config.getServiceAccount(),
131-
config.isServiceAccountFilePath(), collector);
131+
config.isServiceAccountFilePath(), collector, null);
132132

133133
Schema tableSchema = configuredSchema;
134134
if (table != null) {

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ protected void prepareRunInternal(BatchSinkContext context, BigQuery bigQuery, S
138138
configureBigQuerySink();
139139
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), config.getTable(),
140140
config.getServiceAccount(), config.isServiceAccountFilePath(),
141-
collector);
141+
collector, null);
142142
initOutput(context, bigQuery, config.getReferenceName(),
143143
BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), config.getTable()),
144144
config.getTable(), outputSchema, bucket, collector, null, table);
@@ -346,7 +346,7 @@ private void configureTable(Schema schema) {
346346
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(),
347347
config.getTable(),
348348
config.getServiceAccount(),
349-
config.isServiceAccountFilePath());
349+
config.isServiceAccountFilePath(), null, null);
350350
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_DESTINATION_TABLE_EXISTS, table != null);
351351
List<String> tableFieldsNames = null;
352352
if (table != null) {
@@ -372,7 +372,7 @@ private void validateConfiguredSchema(Schema schema, FailureCollector collector)
372372
String tableName = config.getTable();
373373
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), tableName,
374374
config.getServiceAccount(), config.isServiceAccountFilePath(),
375-
collector);
375+
collector, null);
376376

377377
if (table != null && !config.containsMacro(AbstractBigQuerySinkConfig.NAME_UPDATE_SCHEMA)) {
378378
// if table already exists, validate schema against underlying bigquery table

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.cloud.kms.v1.CryptoKeyName;
2525
import com.google.common.annotations.VisibleForTesting;
2626
import com.google.common.base.Strings;
27-
import com.google.common.collect.ImmutableMap;
2827
import com.google.common.collect.ImmutableSet;
2928
import io.cdap.cdap.api.annotation.Description;
3029
import io.cdap.cdap.api.annotation.Macro;
@@ -41,7 +40,6 @@
4140

4241
import java.io.IOException;
4342
import java.util.Arrays;
44-
import java.util.HashSet;
4543
import java.util.List;
4644
import java.util.Map;
4745
import java.util.Objects;
@@ -419,7 +417,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect
419417
}
420418

421419
Table table = BigQueryUtil.getBigQueryTable(project, dataset, tableName, serviceAccount,
422-
isServiceAccountFilePath(), collector);
420+
isServiceAccountFilePath(), collector, null);
423421
if (table != null) {
424422
StandardTableDefinition tableDefinition = table.getDefinition();
425423
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import io.cdap.cdap.api.data.schema.Schema;
4141
import io.cdap.cdap.api.dataset.lib.KeyValue;
4242
import io.cdap.cdap.api.exception.ErrorType;
43-
import io.cdap.cdap.api.exception.ProgramFailureException;
4443
import io.cdap.cdap.etl.api.Emitter;
4544
import io.cdap.cdap.etl.api.FailureCollector;
4645
import io.cdap.cdap.etl.api.PipelineConfigurer;
@@ -50,7 +49,6 @@
5049
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
5150
import io.cdap.cdap.etl.api.connector.Connector;
5251
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
53-
import io.cdap.cdap.etl.api.exception.ErrorContext;
5452
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
5553
import io.cdap.cdap.etl.api.validation.ValidationFailure;
5654
import io.cdap.plugin.common.Asset;
@@ -310,7 +308,7 @@ private com.google.cloud.bigquery.Schema getBQSchema(FailureCollector collector)
310308
String project = config.getDatasetProject();
311309

312310
Table table = BigQueryUtil.getBigQueryTable(project, dataset, tableName, serviceAccount,
313-
config.isServiceAccountFilePath(), collector);
311+
config.isServiceAccountFilePath(), collector, null);
314312
if (table == null) {
315313
// Table does not exist
316314
collector.addFailure(String.format("BigQuery table '%s:%s.%s' does not exist.", project, dataset, tableName),
@@ -343,7 +341,7 @@ private void validatePartitionProperties(FailureCollector collector) {
343341
String dataset = config.getDataset();
344342
String tableName = config.getTable();
345343
Table sourceTable = BigQueryUtil.getBigQueryTable(project, dataset, tableName, config.getServiceAccount(),
346-
config.isServiceAccountFilePath(), collector);
344+
config.isServiceAccountFilePath(), collector, null);
347345
if (sourceTable == null) {
348346
return;
349347
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ private void validatePartitionDate(FailureCollector collector, String partitionD
238238
public Type getSourceTableType() {
239239
Table sourceTable =
240240
BigQueryUtil.getBigQueryTable(getDatasetProject(), getDataset(), table, getServiceAccount(),
241-
isServiceAccountFilePath());
241+
isServiceAccountFilePath(), null, null);
242242
return sourceTable != null ? sourceTable.getDefinition().getType() : null;
243243
}
244244

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
134134
String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null);
135135

136136
com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable(
137-
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath);
137+
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null, null);
138138
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();
139139

140140
String query;
@@ -171,9 +171,9 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
171171
return null;
172172
}
173173
String queryTemplate = "select * from `%s` where %s";
174-
com.google.cloud.bigquery.Table sourceTable = BigQueryUtil.getBigQueryTable(datasetProject, dataset, table,
175-
serviceAccount,
176-
isServiceAccountFilePath);
174+
com.google.cloud.bigquery.Table sourceTable =
175+
BigQueryUtil.getBigQueryTable(datasetProject, dataset, table, serviceAccount, isServiceAccountFilePath, null,
176+
null);
177177
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
178178
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
179179
if (timePartitioning == null && filter == null) {

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 19 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
4040
import io.cdap.cdap.etl.api.validation.InvalidStageException;
4141
import io.cdap.cdap.etl.api.validation.ValidationFailure;
42-
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig;
4342
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySink;
4443
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
4544
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
@@ -569,59 +568,6 @@ public static ValidationFailure validateArraySchema(Schema arraySchema, String n
569568
return null;
570569
}
571570

572-
/**
573-
* Get BigQuery table.
574-
*
575-
* @param datasetProject project where dataset is in
576-
* @param datasetId BigQuery dataset ID
577-
* @param tableName BigQuery table name
578-
* @param serviceAccount service account file path or JSON content
579-
* @param isServiceAccountFilePath indicator for whether service account is file or json
580-
* @return BigQuery table
581-
*/
582-
@Nullable
583-
public static Table getBigQueryTable(String datasetProject, String datasetId, String tableName,
584-
@Nullable String serviceAccount, boolean isServiceAccountFilePath) {
585-
TableId tableId = TableId.of(datasetProject, datasetId, tableName);
586-
587-
com.google.auth.Credentials credentials = null;
588-
if (serviceAccount != null) {
589-
try {
590-
credentials = GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath);
591-
} catch (IOException e) {
592-
throw new InvalidConfigPropertyException(
593-
String.format("Unable to load credentials from %s", isServiceAccountFilePath ? serviceAccount : " JSON."),
594-
"serviceFilePath");
595-
}
596-
}
597-
BigQuery bigQuery = GCPUtils.getBigQuery(datasetProject, credentials, null);
598-
599-
Table table;
600-
try {
601-
table = bigQuery.getTable(tableId);
602-
} catch (BigQueryException e) {
603-
throw new InvalidStageException("Unable to get details about the BigQuery table: " + e.getMessage(), e);
604-
}
605-
606-
return table;
607-
}
608-
609-
/**
610-
* Get BigQuery table.
611-
*
612-
* @param projectId BigQuery project ID
613-
* @param datasetId BigQuery dataset ID
614-
* @param tableName BigQuery table name
615-
* @param serviceAccountPath service account file path
616-
* @param collector failure collector
617-
* @return BigQuery table
618-
*/
619-
@Nullable
620-
public static Table getBigQueryTable(String projectId, String datasetId, String tableName,
621-
@Nullable String serviceAccountPath, FailureCollector collector) {
622-
return getBigQueryTable(projectId, datasetId, tableName, serviceAccountPath, true, collector);
623-
}
624-
625571
/**
626572
* Get BigQuery table.
627573
*
@@ -631,35 +577,43 @@ public static Table getBigQueryTable(String projectId, String datasetId, String
631577
* @param serviceAccount service account file path or JSON content
632578
* @param isServiceAccountFilePath indicator for whether service account is file or json
633579
* @param collector failure collector
580+
* @param readTimeoutSeconds http read time out in seconds
634581
* @return BigQuery table
635582
*/
636583
public static Table getBigQueryTable(String projectId, String dataset, String tableName,
637584
@Nullable String serviceAccount, @Nullable Boolean isServiceAccountFilePath,
638-
FailureCollector collector) {
585+
@Nullable FailureCollector collector, @Nullable Integer readTimeoutSeconds) {
639586
TableId tableId = TableId.of(projectId, dataset, tableName);
640587
com.google.auth.Credentials credentials = null;
641588
if (serviceAccount != null) {
642589
try {
643590
credentials = GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath);
644591
} catch (IOException e) {
645-
collector.addFailure(String.format("Unable to load credentials from %s.",
646-
isServiceAccountFilePath ? serviceAccount : "provided JSON key"),
647-
"Ensure the service account file is available on the local filesystem.")
648-
.withConfigProperty(GCPConfig.NAME_SERVICE_ACCOUNT_FILE_PATH);
649-
throw collector.getOrThrowException();
592+
if (collector != null) {
593+
collector.addFailure(String.format("Unable to load credentials from %s.",
594+
isServiceAccountFilePath ? serviceAccount : "provided JSON key"),
595+
"Ensure the service account file is available on the local filesystem.")
596+
.withConfigProperty(GCPConfig.NAME_SERVICE_ACCOUNT_FILE_PATH);
597+
throw collector.getOrThrowException();
598+
}
599+
throw new InvalidConfigPropertyException(
600+
String.format("Unable to load credentials from %s", isServiceAccountFilePath ? serviceAccount : " JSON."),
601+
"serviceFilePath");
650602
}
651603
}
652-
BigQuery bigQuery = GCPUtils.getBigQuery(projectId, credentials, null);
604+
BigQuery bigQuery = GCPUtils.getBigQuery(projectId, credentials, readTimeoutSeconds);
653605

654606
Table table = null;
655607
try {
656608
table = bigQuery.getTable(tableId);
657609
} catch (BigQueryException e) {
658-
collector.addFailure("Unable to get details about the BigQuery table: " + e.getMessage(), null)
659-
.withConfigProperty(BigQuerySourceConfig.NAME_TABLE);
660-
throw collector.getOrThrowException();
610+
if (collector != null) {
611+
collector.addFailure("Unable to get details about the BigQuery table: " + e.getMessage(), null)
612+
.withConfigProperty(BigQuerySourceConfig.NAME_TABLE);
613+
throw collector.getOrThrowException();
614+
}
615+
throw new InvalidStageException("Unable to get details about the BigQuery table: " + e.getMessage(), e);
661616
}
662-
663617
return table;
664618
}
665619

src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ private void configureTable(Schema schema, String dataset, String datasetProject
355355
config.getTable(),
356356
config.getServiceAccount(),
357357
config.isServiceAccountFilePath(),
358-
collector);
358+
collector, null);
359359
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_DESTINATION_TABLE_EXISTS, table != null);
360360
List<String> tableFieldsNames = null;
361361
if (table != null) {

src/main/java/io/cdap/plugin/gcp/dataplex/sink/config/DataplexBatchSinkConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ private void validateConfiguredSchema(Schema schema, FailureCollector collector,
556556
String tableName = this.getTable();
557557
Table table = BigQueryUtil.getBigQueryTable(this.tryGetProject(), dataset, tableName,
558558
connection.getServiceAccount(), connection.isServiceAccountFilePath(),
559-
collector);
559+
collector, null);
560560

561561
if (table != null && !this.containsMacro(NAME_UPDATE_SCHEMA)) {
562562
// if table already exists, validate schema against underlying bigquery table
@@ -586,7 +586,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect
586586
}
587587

588588
Table table = BigQueryUtil.getBigQueryTable(project, dataset, tableName, serviceAccount,
589-
isServiceAccountFilePath(), collector);
589+
isServiceAccountFilePath(), collector, null);
590590
if (table != null) {
591591
StandardTableDefinition tableDefinition = table.getDefinition();
592592
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();

0 commit comments

Comments
 (0)