2424import com .google .cloud .bigquery .StandardTableDefinition ;
2525import com .google .cloud .bigquery .Table ;
2626import com .google .cloud .bigquery .TableDefinition .Type ;
27+ import com .google .cloud .bigquery .TableId ;
2728import com .google .cloud .bigquery .TimePartitioning ;
2829import com .google .cloud .hadoop .io .bigquery .BigQueryConfiguration ;
2930import io .cdap .cdap .api .annotation .Description ;
@@ -126,8 +127,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
126127 Schema configuredSchema = getOutputSchema (collector );
127128
128129 String serviceAccount = config .getServiceAccount ();
129- Credentials credentials = serviceAccount == null ?
130- null : GCPUtils .loadServiceAccountCredentials (serviceAccount , config .isServiceAccountFilePath ());
130+ Credentials credentials = getCredentials (serviceAccount );
131131 BigQuery bigQuery = GCPUtils .getBigQuery (config .getDatasetProject (), credentials );
132132
133133 uuid = UUID .randomUUID ();
@@ -171,6 +171,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {
171171 if (config .getViewMaterializationDataset () != null ) {
172172 configuration .set (BigQueryConstants .CONFIG_VIEW_MATERIALIZATION_DATASET , config .getViewMaterializationDataset ());
173173 }
174+ String temporaryTableName = String .format ("_%s_%s" , config .getTable (),
175+ UUID .randomUUID ().toString ().replaceAll ("-" , "_" ));
176+ configuration .set (BigQueryConstants .CONFIG_TEMPORARY_TABLE_NAME , temporaryTableName );
174177
175178 String temporaryGcsPath = String .format ("gs://%s/%s/hadoop/input/%s" , bucket , uuid , uuid );
176179 PartitionedBigQueryInputFormat .setTemporaryCloudStorageDirectory (configuration , temporaryGcsPath );
@@ -211,6 +214,24 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
211214
212215 @ Override
213216 public void onRunFinish (boolean succeeded , BatchSourceContext context ) {
217+ deleteGcsTemporaryDirectory ();
218+ deleteBigQueryTemporaryTable ();
219+ }
220+
221+ private void deleteBigQueryTemporaryTable () {
222+ String temporaryTable = configuration .get (BigQueryConstants .CONFIG_TEMPORARY_TABLE_NAME );
223+ try {
224+ String serviceAccount = config .getServiceAccount ();
225+ Credentials credentials = getCredentials (serviceAccount );
226+ BigQuery bigQuery = GCPUtils .getBigQuery (config .getDatasetProject (), credentials );
227+ bigQuery .delete (TableId .of (config .getProject (), config .getDataset (), temporaryTable ));
228+ LOG .debug ("Deleted temporary table '{}'" , temporaryTable );
229+ } catch (IOException e ) {
230+ LOG .error ("Failed to load service account credentials: {}" , e .getMessage (), e );
231+ }
232+ }
233+
234+ private void deleteGcsTemporaryDirectory () {
214235 org .apache .hadoop .fs .Path gcsPath = null ;
215236 String bucket = config .getBucket ();
216237 if (bucket == null ) {
@@ -229,6 +250,12 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) {
229250 }
230251 }
231252
253+ @ Nullable
254+ private Credentials getCredentials (@ Nullable String serviceAccount ) throws IOException {
255+ return serviceAccount == null ?
256+ null : GCPUtils .loadServiceAccountCredentials (serviceAccount , config .isServiceAccountFilePath ());
257+ }
258+
232259 public Schema getSchema (FailureCollector collector ) {
233260 com .google .cloud .bigquery .Schema bqSchema = getBQSchema (collector );
234261 return BigQueryUtil .getTableSchema (bqSchema , collector );
0 commit comments