Skip to content

Commit d3a9084

Browse files
authored
Merge pull request #516 from AdaptiveScale/cherrypick-release/0.15/PLUGIN-499
Cherrypick PLUGIN-499 for release/0.15
2 parents 2e65f25 + bd59700 commit d3a9084

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.bigquery.StandardTableDefinition;
2525
import com.google.cloud.bigquery.Table;
2626
import com.google.cloud.bigquery.TableDefinition.Type;
27+
import com.google.cloud.bigquery.TableId;
2728
import com.google.cloud.bigquery.TimePartitioning;
2829
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
2930
import io.cdap.cdap.api.annotation.Description;
@@ -126,8 +127,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
126127
Schema configuredSchema = getOutputSchema(collector);
127128

128129
String serviceAccount = config.getServiceAccount();
129-
Credentials credentials = serviceAccount == null ?
130-
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
130+
Credentials credentials = getCredentials(serviceAccount);
131131
BigQuery bigQuery = GCPUtils.getBigQuery(config.getDatasetProject(), credentials);
132132

133133
uuid = UUID.randomUUID();
@@ -171,6 +171,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {
171171
if (config.getViewMaterializationDataset() != null) {
172172
configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset());
173173
}
174+
String temporaryTableName = String.format("_%s_%s", config.getTable(),
175+
UUID.randomUUID().toString().replaceAll("-", "_"));
176+
configuration.set(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME, temporaryTableName);
174177

175178
String temporaryGcsPath = String.format("gs://%s/%s/hadoop/input/%s", bucket, uuid, uuid);
176179
PartitionedBigQueryInputFormat.setTemporaryCloudStorageDirectory(configuration, temporaryGcsPath);
@@ -211,6 +214,24 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
211214

212215
@Override
213216
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
217+
deleteGcsTemporaryDirectory();
218+
deleteBigQueryTemporaryTable();
219+
}
220+
221+
private void deleteBigQueryTemporaryTable() {
222+
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
223+
try {
224+
String serviceAccount = config.getServiceAccount();
225+
Credentials credentials = getCredentials(serviceAccount);
226+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getDatasetProject(), credentials);
227+
bigQuery.delete(TableId.of(config.getProject(), config.getDataset(), temporaryTable));
228+
LOG.debug("Deleted temporary table '{}'", temporaryTable);
229+
} catch (IOException e) {
230+
LOG.error("Failed to load service account credentials: {}", e.getMessage(), e);
231+
}
232+
}
233+
234+
private void deleteGcsTemporaryDirectory() {
214235
org.apache.hadoop.fs.Path gcsPath = null;
215236
String bucket = config.getBucket();
216237
if (bucket == null) {
@@ -229,6 +250,12 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) {
229250
}
230251
}
231252

253+
@Nullable
254+
private Credentials getCredentials(@Nullable String serviceAccount) throws IOException {
255+
return serviceAccount == null ?
256+
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
257+
}
258+
232259
public Schema getSchema(FailureCollector collector) {
233260
com.google.cloud.bigquery.Schema bqSchema = getBQSchema(collector);
234261
return BigQueryUtil.getTableSchema(bqSchema, collector);

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
110110
TableReference sourceTable = new TableReference().setDatasetId(datasetId).setProjectId(inputProjectId)
111111
.setTableId(tableName);
112112
String location = bigQueryHelper.getTable(sourceTable).getLocation();
113-
String temporaryTableName = String.format("%s_%s", tableName, UUID.randomUUID().toString().replaceAll("-", "_"));
113+
String temporaryTableName = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
114114
TableReference exportTableReference = createExportTableReference(type, inputProjectId, datasetId,
115115
temporaryTableName, configuration);
116116
runQuery(bigQueryHelper, inputProjectId, exportTableReference, query, location);

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ public interface BigQueryConstants {
4242
String CONFIG_PARTITION_INTEGER_RANGE_START = "cdap.bq.sink.partition.integer.range.start";
4343
String CONFIG_PARTITION_INTEGER_RANGE_END = "cdap.bq.sink.partition.integer.range.end";
4444
String CONFIG_PARTITION_INTEGER_RANGE_INTERVAL = "cdap.bq.sink.partition.integer.range.interval";
45+
String CONFIG_TEMPORARY_TABLE_NAME = "cdap.bq.source.temporary.table.name";
4546
}

0 commit comments

Comments
 (0)