2424import com .google .cloud .bigquery .StandardTableDefinition ;
2525import com .google .cloud .bigquery .Table ;
2626import com .google .cloud .bigquery .TableDefinition .Type ;
27+ import com .google .cloud .bigquery .TableId ;
2728import com .google .cloud .bigquery .TimePartitioning ;
2829import com .google .cloud .hadoop .io .bigquery .BigQueryConfiguration ;
2930import io .cdap .cdap .api .annotation .Description ;
@@ -118,8 +119,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
118119 Schema configuredSchema = getOutputSchema (collector );
119120
120121 String serviceAccount = config .getServiceAccount ();
121- Credentials credentials = serviceAccount == null ?
122- null : GCPUtils .loadServiceAccountCredentials (serviceAccount , config .isServiceAccountFilePath ());
122+ Credentials credentials = getCredentials (serviceAccount );
123123 BigQuery bigQuery = GCPUtils .getBigQuery (config .getDatasetProject (), credentials );
124124
125125 uuid = UUID .randomUUID ();
@@ -163,6 +163,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {
163163 if (config .getViewMaterializationDataset () != null ) {
164164 configuration .set (BigQueryConstants .CONFIG_VIEW_MATERIALIZATION_DATASET , config .getViewMaterializationDataset ());
165165 }
166+ String temporaryTableName = String .format ("_%s_%s" , config .getTable (),
167+ UUID .randomUUID ().toString ().replaceAll ("-" , "_" ));
168+ configuration .set (BigQueryConstants .CONFIG_TEMPORARY_TABLE_NAME , temporaryTableName );
166169
167170 String temporaryGcsPath = String .format ("gs://%s/%s/hadoop/input/%s" , bucket , uuid , uuid );
168171 PartitionedBigQueryInputFormat .setTemporaryCloudStorageDirectory (configuration , temporaryGcsPath );
@@ -203,6 +206,24 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
203206
204207 @ Override
205208 public void onRunFinish (boolean succeeded , BatchSourceContext context ) {
209+ deleteGcsTemporaryDirectory ();
210+ deleteBigQueryTemporaryTable ();
211+ }
212+
213+ private void deleteBigQueryTemporaryTable () {
214+ String temporaryTable = configuration .get (BigQueryConstants .CONFIG_TEMPORARY_TABLE_NAME );
215+ try {
216+ String serviceAccount = config .getServiceAccount ();
217+ Credentials credentials = getCredentials (serviceAccount );
218+ BigQuery bigQuery = GCPUtils .getBigQuery (config .getDatasetProject (), credentials );
219+ bigQuery .delete (TableId .of (config .getProject (), config .getDataset (), temporaryTable ));
220+ LOG .debug ("Deleted temporary table '{}'" , temporaryTable );
221+ } catch (IOException e ) {
222+ LOG .error ("Failed to load service account credentials: {}" , e .getMessage (), e );
223+ }
224+ }
225+
226+ private void deleteGcsTemporaryDirectory () {
206227 org .apache .hadoop .fs .Path gcsPath = null ;
207228 String bucket = config .getBucket ();
208229 if (bucket == null ) {
@@ -221,6 +242,12 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) {
221242 }
222243 }
223244
245+ @ Nullable
246+ private Credentials getCredentials (@ Nullable String serviceAccount ) throws IOException {
247+ return serviceAccount == null ?
248+ null : GCPUtils .loadServiceAccountCredentials (serviceAccount , config .isServiceAccountFilePath ());
249+ }
250+
224251 public Schema getSchema (FailureCollector collector ) {
225252 com .google .cloud .bigquery .Schema bqSchema = getBQSchema (collector );
226253 return BigQueryUtil .getTableSchema (bqSchema , collector );
0 commit comments