diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index fac9b2ffea..b0f85a1e55 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -61,6 +61,7 @@ jobs:
with:
repository: cdapio/cdap-e2e-tests
path: e2e
+ ref: release/6.10
- name: Cache
uses: actions/cache@v4
with:
diff --git a/pom.xml b/pom.xml
index 68ee9302a6..f584182615 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
io.cdap.plugin
google-cloud
- 0.23.4
+ 0.23.5-SNAPSHOT
Google Cloud Plugins
jar
Plugins for Google Big Query
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java
index 5c2980e4db..bac8474179 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java
@@ -82,14 +82,19 @@ public abstract class AbstractBigQuerySink extends BatchSink input, Emitter<
@Override
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
- BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, config.getBucket(), bucketPath);
+ try {
+ Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
+ Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
+ BigQuerySourceUtils.cleanupGcsBucket(configuration, bucketPath, config.getBucket(), storage);
+ } catch (IOException e) {
+ LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
+ }
BigQuerySourceUtils.deleteBigQueryTemporaryTable(configuration, config);
}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java
index aa95d743ac..a32228c558 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java
@@ -25,6 +25,7 @@
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
+import com.google.common.base.Strings;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
@@ -167,7 +168,8 @@ public static String getTemporaryGcsPath(String bucket, String pathPrefix, Strin
* @param configuration Hadoop Configuration.
* @param config BigQuery source configuration.
*/
- public static void deleteBigQueryTemporaryTable(Configuration configuration, BigQuerySourceConfig config) {
+ public static void deleteBigQueryTemporaryTable(Configuration configuration,
+ BigQuerySourceConfig config) {
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
try {
Credentials credentials = getCredentials(config.getConnection());
@@ -175,7 +177,9 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable));
LOG.debug("Deleted temporary table '{}'", temporaryTable);
} catch (IOException e) {
- LOG.error("Failed to load service account credentials: {}", e.getMessage(), e);
+ LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
+ } catch (Exception e) {
+ LOG.warn("Failed to delete temporary BQ table: '{}': {}", temporaryTable, e.getMessage(), e);
}
}
@@ -187,8 +191,7 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
* @param runId the run ID
*/
public static void deleteGcsTemporaryDirectory(Configuration configuration,
- String bucket,
- String runId) {
+ @Nullable String bucket, String runId) {
String gcsPath;
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
if (bucket == null) {
@@ -199,8 +202,27 @@ public static void deleteGcsTemporaryDirectory(Configuration configuration,
try {
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
- } catch (IOException e) {
- LOG.error("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
+ }
+ }
+
+ /**
+ * Cleanup temporary GCS bucket if created.
+ */
+ public static void cleanupGcsBucket(Configuration configuration, String runId,
+ @Nullable String bucket, Storage storage) {
+ if (!Strings.isNullOrEmpty(bucket)) {
+ // Only need to delete the bucket if it was created for this run
+ deleteGcsTemporaryDirectory(configuration, bucket, runId);
+ return;
+ }
+ bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
+
+ try {
+ BigQueryUtil.deleteGcsBucket(storage, bucket);
+ } catch (Exception e) {
+ LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e);
}
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java
index 9b2632f2e3..a11895f5ec 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java
@@ -190,18 +190,17 @@ public void prepareRun(SQLEngineContext context) throws Exception {
public void onRunFinish(boolean succeeded, SQLEngineContext context) {
super.onRunFinish(succeeded, context);
- String gcsPath;
// If the bucket was created for this run, we should delete it.
// Otherwise, just clean the directory within the provided bucket.
- if (sqlEngineConfig.getBucket() == null) {
- gcsPath = String.format("gs://%s", bucket);
- } else {
- gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId);
- }
try {
- BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
+ String serviceAccount = sqlEngineConfig.getServiceAccount();
+ Credentials credentials = serviceAccount == null ?
+ null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
+ sqlEngineConfig.isServiceAccountFilePath());
+ Storage storage = GCPUtils.getStorage(sqlEngineConfig.getProject(), credentials);
+ BigQuerySinkUtils.cleanupGcsBucket(configuration, runId, sqlEngineConfig.getBucket(), storage);
} catch (IOException e) {
- LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
+ LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
index 0f3fc80148..db39dff861 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
@@ -17,6 +17,7 @@
package io.cdap.plugin.gcp.bigquery.util;
import com.google.api.client.googleapis.media.MediaHttpUploader;
+import com.google.api.gax.paging.Page;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
@@ -30,6 +31,9 @@
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
@@ -759,6 +763,26 @@ public static void deleteTemporaryDirectory(Configuration configuration, String
}
}
+ /**
+ * Deletes the GCS bucket.
+ */
+ public static void deleteGcsBucket(Storage storage, String bucket) {
+ Page blobs = storage.list(bucket, Storage.BlobListOption.versions(true));
+ List blobIds = new ArrayList<>();
+ for (Blob blob : blobs.iterateAll()) {
+ blobIds.add(blob.getBlobId());
+ if (blobIds.size() == 100) {
+ storage.delete(blobIds); // Batch delete
+ blobIds.clear();
+ }
+ }
+ if (!blobIds.isEmpty()) {
+ storage.delete(blobIds);
+ }
+ storage.delete(bucket);
+ LOG.debug("Deleted GCS bucket '{}'.", bucket);
+ }
+
public static String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
String partitionFromDate,
String partitionToDate) {
diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
index 7351b628b3..13d1a21b74 100644
--- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
+++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
@@ -33,6 +33,7 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
+import com.google.common.collect.ImmutableMap;
import com.google.gson.reflect.TypeToken;
import io.cdap.plugin.gcp.gcs.GCSPath;
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
@@ -284,8 +285,12 @@ public static void createBucket(Storage storage, String bucket, @Nullable String
if (cmekKeyName != null) {
builder.setDefaultKmsKeyName(cmekKeyName.toString());
}
+ // Add label to indicate bucket is created by cdap
+ builder.setLabels(
+ new ImmutableMap.Builder().put("created_by", "cdap").build());
storage.create(builder.build());
}
+
/**
* Formats a string as a component of a Fully-Qualified Name (FQN).
*
diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java
index 43da64e7bd..cd82193e25 100644
--- a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java
+++ b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java
@@ -81,8 +81,6 @@
import io.cdap.plugin.gcp.gcs.StorageClient;
import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
@@ -249,16 +247,18 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
return;
}
- Path gcsPath = new Path(DataplexConstants.STORAGE_BUCKET_PATH_PREFIX + runUUID);
try {
- FileSystem fs = gcsPath.getFileSystem(baseConfiguration);
- if (fs.exists(gcsPath)) {
- fs.delete(gcsPath, true);
- LOG.debug("Deleted temporary directory '{}'", gcsPath);
- }
- emitMetricsForBigQueryDataset(succeeded, context);
+ String serviceAccount = config.getServiceAccount();
+ Credentials credentials = serviceAccount == null ? null
+ : GCPUtils.loadServiceAccountCredentials(serviceAccount,
+ config.isServiceAccountFilePath());
+ Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
+ BigQuerySinkUtils.cleanupGcsBucket(baseConfiguration, runUUID.toString(), null, storage);
} catch (IOException e) {
- LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
+ LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
+ }
+ try {
+ emitMetricsForBigQueryDataset(succeeded, context);
} catch (Exception exception) {
LOG.warn("Exception while trying to emit metric. No metric will be emitted for the number of affected rows.",
exception);
diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java b/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java
index 043889cfbe..5ef699940f 100644
--- a/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java
+++ b/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java
@@ -379,9 +379,10 @@ private void recordLineage(LineageRecorder lineageRecorder, List outputF
@Override
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
if (entity.getSystem().equals(StorageSystem.BIGQUERY)) {
- BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, null, bucketPath);
String temporaryTable = configuration.get(CONFIG_TEMPORARY_TABLE_NAME);
Credentials credentials = config.getCredentials(context.getFailureCollector());
+ Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
+ BigQuerySourceUtils.cleanupGcsBucket(configuration, bucketPath, null, storage);
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
bigQuery.delete(TableId.of(datasetProject, dataset, temporaryTable));
LOG.debug("Deleted temporary table '{}'", temporaryTable);
diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java
index 9cb0490f31..8c8a82f58d 100644
--- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java
@@ -48,7 +48,8 @@ public void testConfigureNullBucket() {
BigQuerySinkUtils.configureBucket(configuration, null, "some-run-id");
Assert.assertTrue(configuration.getBoolean("fs.gs.bucket.delete.enable", false));
- Assert.assertEquals("gs://some-run-id/some-run-id", configuration.get("fs.default.name"));
+ Assert.assertEquals("gs://bq-sink-bucket-some-run-id/some-run-id",
+ configuration.get("fs.default.name"));
Assert.assertTrue(configuration.getBoolean("fs.gs.impl.disable.cache", false));
Assert.assertFalse(configuration.getBoolean("fs.gs.metadata.cache.enable", true));
}