diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index 375733ff5..d0ec48334 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -31,12 +31,10 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.api.exception.ErrorType; -import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; -import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider; @@ -88,13 +86,18 @@ public abstract class AbstractBigQuerySink extends BatchSink input, Emitter< @Override public void onRunFinish(boolean succeeded, BatchSourceContext context) { - BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, config.getBucket(), bucketPath); + try { + Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection()); + Storage storage = GCPUtils.getStorage(config.getProject(), credentials); + BigQuerySourceUtils.cleanupGcsBucket(configuration, bucketPath, config.getBucket(), storage); + } catch (IOException e) { + LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e); + } BigQuerySourceUtils.deleteBigQueryTemporaryTable(configuration, config); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java index 56135c479..aa44e48f9 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java @@ -25,6 +25,7 @@ import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; +import com.google.common.base.Strings; import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.api.exception.ErrorCodeType; import io.cdap.cdap.api.exception.ErrorType; @@ -174,15 +175,19 @@ public static String getTemporaryGcsPath(String bucket, String pathPrefix, Strin * @param configuration Hadoop Configuration. * @param config BigQuery source configuration. */ - public static void deleteBigQueryTemporaryTable(Configuration configuration, BigQuerySourceConfig config) { + public static void deleteBigQueryTemporaryTable(Configuration configuration, + BigQuerySourceConfig config) { String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME); try { Credentials credentials = getCredentials(config.getConnection()); - BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null); + BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, + GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS); bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable)); LOG.debug("Deleted temporary table '{}'", temporaryTable); } catch (IOException e) { - LOG.error("Failed to load service account credentials: {}", e.getMessage(), e); + LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e); + } catch (Exception e) { + LOG.warn("Failed to delete temporary BQ table: '{}': {}", temporaryTable, e.getMessage(), e); } } @@ -194,8 +199,7 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big * @param runId the run ID */ public static void deleteGcsTemporaryDirectory(Configuration configuration, - String bucket, - String runId) { + @Nullable String bucket, String runId) { String gcsPath; // If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket. if (bucket == null) { @@ -206,8 +210,27 @@ public static void deleteGcsTemporaryDirectory(Configuration configuration, try { BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath); - } catch (IOException e) { - LOG.error("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage()); + } catch (Exception e) { + LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage()); + } + } + + /** + * Cleanup temporary GCS bucket if created. + */ + public static void cleanupGcsBucket(Configuration configuration, String runId, + @Nullable String bucket, Storage storage) { + if (!Strings.isNullOrEmpty(bucket)) { + // Only need to delete the bucket if it was created for this run + deleteGcsTemporaryDirectory(configuration, bucket, runId); + return; + } + bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId); + + try { + BigQueryUtil.deleteGcsBucket(storage, bucket); + } catch (Exception e) { + LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e); } } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index 4d80fd2a7..493ee292b 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -190,18 +190,17 @@ public void prepareRun(SQLEngineContext context) throws Exception { public void onRunFinish(boolean succeeded, SQLEngineContext context) { super.onRunFinish(succeeded, context); - String gcsPath; // If the bucket was created for this run, we should delete it. // Otherwise, just clean the directory within the provided bucket. - if (sqlEngineConfig.getBucket() == null) { - gcsPath = String.format("gs://%s", bucket); - } else { - gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId); - } try { - BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath); + String serviceAccount = sqlEngineConfig.getServiceAccount(); + Credentials credentials = serviceAccount == null ? + null : GCPUtils.loadServiceAccountCredentials(serviceAccount, + sqlEngineConfig.isServiceAccountFilePath()); + Storage storage = GCPUtils.getStorage(sqlEngineConfig.getProject(), credentials); + BigQuerySinkUtils.cleanupGcsBucket(configuration, runId, sqlEngineConfig.getBucket(), storage); } catch (IOException e) { - LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage()); + LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e); } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index f816a5163..be2b88ba0 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -17,6 +17,7 @@ package io.cdap.plugin.gcp.bigquery.util; import com.google.api.client.googleapis.media.MediaHttpUploader; +import com.google.api.gax.paging.Page; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Dataset; @@ -30,6 +31,9 @@ import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -713,6 +717,26 @@ public static void deleteTemporaryDirectory(Configuration configuration, String } } + /** + * Deletes the GCS bucket. + */ + public static void deleteGcsBucket(Storage storage, String bucket) { + Page blobs = storage.list(bucket, Storage.BlobListOption.versions(true)); + List blobIds = new ArrayList<>(); + for (Blob blob : blobs.iterateAll()) { + blobIds.add(blob.getBlobId()); + if (blobIds.size() == 100) { + storage.delete(blobIds); // Batch delete + blobIds.clear(); + } + } + if (!blobIds.isEmpty()) { + storage.delete(blobIds); + } + storage.delete(bucket); + LOG.debug("Deleted GCS bucket '{}'.", bucket); + } + public static String generateTimePartitionCondition(StandardTableDefinition tableDefinition, String partitionFromDate, String partitionToDate) { diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java index 1d92d3553..708065c8a 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java @@ -34,6 +34,7 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; +import com.google.common.collect.ImmutableMap; import com.google.gson.reflect.TypeToken; import io.cdap.plugin.gcp.gcs.GCSPath; import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider; @@ -304,8 +305,12 @@ public static void createBucket(Storage storage, String bucket, @Nullable String if (cmekKeyName != null) { builder.setDefaultKmsKeyName(cmekKeyName.toString()); } + // Add label to indicate bucket is created by cdap + builder.setLabels( + new ImmutableMap.Builder().put("created_by", "cdap").build()); storage.create(builder.build()); } + /** * Formats a string as a component of a Fully-Qualified Name (FQN). * diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java index dd90f7760..2a344e56e 100644 --- a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java @@ -81,8 +81,6 @@ import io.cdap.plugin.gcp.gcs.StorageClient; import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; @@ -249,16 +247,18 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) { return; } - Path gcsPath = new Path(DataplexConstants.STORAGE_BUCKET_PATH_PREFIX + runUUID); try { - FileSystem fs = gcsPath.getFileSystem(baseConfiguration); - if (fs.exists(gcsPath)) { - fs.delete(gcsPath, true); - LOG.debug("Deleted temporary directory '{}'", gcsPath); - } - emitMetricsForBigQueryDataset(succeeded, context); + String serviceAccount = config.getServiceAccount(); + Credentials credentials = serviceAccount == null ? null + : GCPUtils.loadServiceAccountCredentials(serviceAccount, + config.isServiceAccountFilePath()); + Storage storage = GCPUtils.getStorage(config.getProject(), credentials); + BigQuerySinkUtils.cleanupGcsBucket(baseConfiguration, runUUID.toString(), null, storage); } catch (IOException e) { - LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage()); + LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e); + } + try { + emitMetricsForBigQueryDataset(succeeded, context); } catch (Exception exception) { LOG.warn("Exception while trying to emit metric. No metric will be emitted for the number of affected rows.", exception); diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java b/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java index 6989bd5d5..c21ada45a 100644 --- a/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java +++ b/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java @@ -379,9 +379,10 @@ private void recordLineage(LineageRecorder lineageRecorder, List outputF @Override public void onRunFinish(boolean succeeded, BatchSourceContext context) { if (entity.getSystem().equals(StorageSystem.BIGQUERY)) { - BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, null, bucketPath); String temporaryTable = configuration.get(CONFIG_TEMPORARY_TABLE_NAME); Credentials credentials = config.getCredentials(context.getFailureCollector()); + Storage storage = GCPUtils.getStorage(config.getProject(), credentials); + BigQuerySourceUtils.cleanupGcsBucket(configuration, bucketPath, null, storage); BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null); bigQuery.delete(TableId.of(datasetProject, dataset, temporaryTable)); LOG.debug("Deleted temporary table '{}'", temporaryTable); diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java index 9cb0490f3..8c8a82f58 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java @@ -48,7 +48,8 @@ public void testConfigureNullBucket() { BigQuerySinkUtils.configureBucket(configuration, null, "some-run-id"); Assert.assertTrue(configuration.getBoolean("fs.gs.bucket.delete.enable", false)); - Assert.assertEquals("gs://some-run-id/some-run-id", configuration.get("fs.default.name")); + Assert.assertEquals("gs://bq-sink-bucket-some-run-id/some-run-id", + configuration.get("fs.default.name")); Assert.assertTrue(configuration.getBoolean("fs.gs.impl.disable.cache", false)); Assert.assertFalse(configuration.getBoolean("fs.gs.metadata.cache.enable", true)); }