Skip to content

Commit 4e2b578

Browse files
Use storage client for bucket cleanup
1 parent 6307e9e commit 4e2b578

File tree

12 files changed

+178
-44
lines changed

12 files changed

+178
-44
lines changed

.github/workflows/e2e.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ jobs:
6161
with:
6262
repository: cdapio/cdap-e2e-tests
6363
path: e2e
64+
ref: release/6.10
6465
- name: Cache
6566
uses: actions/cache@v4
6667
with:

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
<groupId>io.cdap.plugin</groupId>
2222
<artifactId>google-cloud</artifactId>
23-
<version>0.23.4</version>
23+
<version>0.23.5-SNAPSHOT</version>
2424
<name>Google Cloud Plugins</name>
2525
<packaging>jar</packaging>
2626
<description>Plugins for Google Big Query</description>

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,19 @@ public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, S
8282
@Override
8383
public final void prepareRun(BatchSinkContext context) throws Exception {
8484
prepareRunValidation(context);
85-
85+
FailureCollector collector = context.getFailureCollector();
86+
Credentials credentials = null;
8687
AbstractBigQuerySinkConfig config = getConfig();
87-
String serviceAccount = config.getServiceAccount();
88-
Credentials credentials = serviceAccount == null ?
89-
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
88+
try {
89+
credentials = BigQuerySinkUtils.getCredentials(config.getConnection());
90+
} catch (Exception e) {
91+
String errorReason = "Unable to load service account credentials: ";
92+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
93+
.withStacktrace(e.getStackTrace());
94+
collector.getOrThrowException();
95+
}
9096
String project = config.getProject();
9197
bigQuery = GCPUtils.getBigQuery(project, credentials);
92-
FailureCollector collector = context.getFailureCollector();
9398
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
9499
collector.getOrThrowException();
95100
baseConfiguration = getBaseConfiguration(cmekKeyName);
@@ -121,17 +126,13 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
121126

122127
@Override
123128
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
124-
String gcsPath;
125-
String bucket = getConfig().getBucket();
126-
if (bucket == null) {
127-
gcsPath = String.format("gs://%s", runUUID.toString());
128-
} else {
129-
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
130-
}
131129
try {
132-
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
130+
Credentials credentials = BigQuerySinkUtils.getCredentials(getConfig().getConnection());
131+
Storage storage = GCPUtils.getStorage(getConfig().getProject(), credentials);
132+
BigQuerySinkUtils.cleanupGcsBucket(baseConfiguration, runUUID.toString(),
133+
getConfig().getBucket(), storage);
133134
} catch (IOException e) {
134-
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
135+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
135136
}
136137
}
137138

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.cdap.plugin.gcp.bigquery.sink;
1818

19+
import com.google.api.gax.paging.Page;
20+
import com.google.auth.Credentials;
1921
import com.google.cloud.bigquery.BigQuery;
2022
import com.google.cloud.bigquery.BigQueryException;
2123
import com.google.cloud.bigquery.Dataset;
@@ -29,9 +31,11 @@
2931
import com.google.cloud.bigquery.TableId;
3032
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
3133
import com.google.cloud.kms.v1.CryptoKeyName;
34+
import com.google.cloud.storage.Blob;
3235
import com.google.cloud.storage.Bucket;
3336
import com.google.cloud.storage.Storage;
3437
import com.google.cloud.storage.StorageException;
38+
import com.google.common.base.Strings;
3539
import com.google.common.reflect.TypeToken;
3640
import com.google.gson.Gson;
3741
import io.cdap.cdap.api.data.schema.Schema;
@@ -40,6 +44,7 @@
4044
import io.cdap.cdap.etl.api.validation.ValidationFailure;
4145
import io.cdap.plugin.common.Asset;
4246
import io.cdap.plugin.common.LineageRecorder;
47+
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
4348
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration;
4449
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
4550
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema;
@@ -50,6 +55,8 @@
5055
import org.apache.hadoop.conf.Configuration;
5156
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
5257
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
58+
import org.slf4j.Logger;
59+
import org.slf4j.LoggerFactory;
5360

5461
import java.io.IOException;
5562
import java.lang.reflect.Type;
@@ -74,8 +81,12 @@
7481
*/
7582
public final class BigQuerySinkUtils {
7683

84+
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySinkUtils.class);
7785
public static final String GS_PATH_FORMAT = "gs://%s/%s";
7886
private static final String TEMPORARY_BUCKET_FORMAT = GS_PATH_FORMAT + "/input/%s-%s";
87+
private static final String BQ_TEMP_BUCKET_NAME_PREFIX = "bq-sink-bucket-";
88+
private static final String BQ_TEMP_BUCKET_NAME_TEMPLATE = BQ_TEMP_BUCKET_NAME_PREFIX + "%s";
89+
private static final String BQ_TEMP_BUCKET_PATH_TEMPLATE = "gs://" + BQ_TEMP_BUCKET_NAME_TEMPLATE;
7990
private static final String DATETIME = "DATETIME";
8091
private static final String RECORD = "RECORD";
8192
private static final String JSON = "JSON";
@@ -255,7 +266,7 @@ public static String configureBucket(Configuration baseConfiguration, @Nullable
255266
boolean deleteBucket = false;
256267
// If the bucket is null, assign the run ID as the bucket name and mark the bucket for deletion.
257268
if (bucket == null) {
258-
bucket = runId;
269+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
259270
deleteBucket = true;
260271
}
261272
return configureBucket(baseConfiguration, bucket, runId, deleteBucket);
@@ -982,4 +993,57 @@ private static void getJsonStringFieldsFromBQSchema(FieldList fieldList,
982993
path.remove(path.size() - 1);
983994
}
984995
}
996+
997+
/**
998+
* Deletes temporary GCS directory.
999+
*
1000+
* @param configuration Hadoop Configuration.
1001+
* @param bucket the bucket name
1002+
* @param runId the run ID
1003+
*/
1004+
private static void deleteGcsTemporaryDirectory(Configuration configuration,
1005+
@Nullable String bucket, String runId) {
1006+
String gcsPath;
1007+
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
1008+
if (bucket == null) {
1009+
gcsPath = String.format(BQ_TEMP_BUCKET_PATH_TEMPLATE, runId);
1010+
} else {
1011+
gcsPath = String.format(GS_PATH_FORMAT, bucket, runId);
1012+
}
1013+
1014+
try {
1015+
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
1016+
} catch (Exception e) {
1017+
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
1018+
}
1019+
}
1020+
1021+
/**
1022+
* Returns the serviceAccountCredentials if present in the config, otherwise null.
1023+
*/
1024+
@Nullable
1025+
public static Credentials getCredentials(BigQueryConnectorConfig config) throws IOException {
1026+
return config.getServiceAccount() == null ?
1027+
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
1028+
config.isServiceAccountFilePath());
1029+
}
1030+
1031+
/**
1032+
* Cleanup temporary GCS bucket if created.
1033+
*/
1034+
public static void cleanupGcsBucket(Configuration configuration, String runId,
1035+
@Nullable String bucket, Storage storage) {
1036+
if (!Strings.isNullOrEmpty(bucket)) {
1037+
// Only need to delete the bucket if it was created for this run
1038+
deleteGcsTemporaryDirectory(configuration, bucket, runId);
1039+
return;
1040+
}
1041+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
1042+
1043+
try {
1044+
BigQueryUtil.deleteGcsBucket(storage, bucket);
1045+
} catch (Exception e) {
1046+
LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e);
1047+
}
1048+
}
9851049
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.slf4j.Logger;
6565
import org.slf4j.LoggerFactory;
6666

67+
import java.io.IOException;
6768
import java.time.DateTimeException;
6869
import java.time.LocalDate;
6970
import java.util.List;
@@ -135,7 +136,16 @@ public void prepareRun(BatchSourceContext context) throws Exception {
135136

136137
// Create BigQuery client
137138
String serviceAccount = config.getServiceAccount();
138-
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
139+
Credentials credentials = null;
140+
try {
141+
credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
142+
} catch (Exception e) {
143+
String errorReason = "Unable to load service account credentials: ";
144+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
145+
.withStacktrace(e.getStackTrace());
146+
collector.getOrThrowException();
147+
}
148+
139149
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
140150
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
141151
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
@@ -205,7 +215,13 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
205215

206216
@Override
207217
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
208-
BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, config.getBucket(), bucketPath);
218+
try {
219+
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
220+
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
221+
BigQuerySourceUtils.cleanupGcsBucket(configuration, bucketPath, config.getBucket(), storage);
222+
} catch (IOException e) {
223+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
224+
}
209225
BigQuerySourceUtils.deleteBigQueryTemporaryTable(configuration, config);
210226
}
211227

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.kms.v1.CryptoKeyName;
2626
import com.google.cloud.storage.Storage;
2727
import com.google.cloud.storage.StorageException;
28+
import com.google.common.base.Strings;
2829
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
2930
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
3031
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
@@ -167,15 +168,18 @@ public static String getTemporaryGcsPath(String bucket, String pathPrefix, Strin
167168
* @param configuration Hadoop Configuration.
168169
* @param config BigQuery source configuration.
169170
*/
170-
public static void deleteBigQueryTemporaryTable(Configuration configuration, BigQuerySourceConfig config) {
171+
public static void deleteBigQueryTemporaryTable(Configuration configuration,
172+
BigQuerySourceConfig config) {
171173
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
172174
try {
173175
Credentials credentials = getCredentials(config.getConnection());
174176
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
175177
bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable));
176178
LOG.debug("Deleted temporary table '{}'", temporaryTable);
177179
} catch (IOException e) {
178-
LOG.error("Failed to load service account credentials: {}", e.getMessage(), e);
180+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
181+
} catch (Exception e) {
182+
LOG.warn("Failed to delete temporary BQ table: '{}': {}", temporaryTable, e.getMessage(), e);
179183
}
180184
}
181185

@@ -187,8 +191,7 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
187191
* @param runId the run ID
188192
*/
189193
public static void deleteGcsTemporaryDirectory(Configuration configuration,
190-
String bucket,
191-
String runId) {
194+
@Nullable String bucket, String runId) {
192195
String gcsPath;
193196
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
194197
if (bucket == null) {
@@ -199,8 +202,27 @@ public static void deleteGcsTemporaryDirectory(Configuration configuration,
199202

200203
try {
201204
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
202-
} catch (IOException e) {
203-
LOG.error("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
205+
} catch (Exception e) {
206+
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
207+
}
208+
}
209+
210+
/**
211+
* Cleanup temporary GCS bucket if created.
212+
*/
213+
public static void cleanupGcsBucket(Configuration configuration, String runId,
214+
@Nullable String bucket, Storage storage) {
215+
if (!Strings.isNullOrEmpty(bucket)) {
216+
// Only need to delete the bucket if it was created for this run
217+
deleteGcsTemporaryDirectory(configuration, bucket, runId);
218+
return;
219+
}
220+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
221+
222+
try {
223+
BigQueryUtil.deleteGcsBucket(storage, bucket);
224+
} catch (Exception e) {
225+
LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e);
204226
}
205227
}
206228
}

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,17 @@ public void prepareRun(SQLEngineContext context) throws Exception {
190190
public void onRunFinish(boolean succeeded, SQLEngineContext context) {
191191
super.onRunFinish(succeeded, context);
192192

193-
String gcsPath;
194193
// If the bucket was created for this run, we should delete it.
195194
// Otherwise, just clean the directory within the provided bucket.
196-
if (sqlEngineConfig.getBucket() == null) {
197-
gcsPath = String.format("gs://%s", bucket);
198-
} else {
199-
gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId);
200-
}
201195
try {
202-
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
196+
String serviceAccount = sqlEngineConfig.getServiceAccount();
197+
Credentials credentials = serviceAccount == null ?
198+
null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
199+
sqlEngineConfig.isServiceAccountFilePath());
200+
Storage storage = GCPUtils.getStorage(sqlEngineConfig.getProject(), credentials);
201+
BigQuerySinkUtils.cleanupGcsBucket(configuration, runId, sqlEngineConfig.getBucket(), storage);
203202
} catch (IOException e) {
204-
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
203+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
205204
}
206205
}
207206

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.gcp.bigquery.util;
1818

1919
import com.google.api.client.googleapis.media.MediaHttpUploader;
20+
import com.google.api.gax.paging.Page;
2021
import com.google.cloud.bigquery.BigQuery;
2122
import com.google.cloud.bigquery.BigQueryException;
2223
import com.google.cloud.bigquery.Dataset;
@@ -30,6 +31,9 @@
3031
import com.google.cloud.bigquery.TimePartitioning;
3132
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
3233
import com.google.cloud.kms.v1.CryptoKeyName;
34+
import com.google.cloud.storage.Blob;
35+
import com.google.cloud.storage.BlobId;
36+
import com.google.cloud.storage.Storage;
3337
import com.google.common.annotations.VisibleForTesting;
3438
import com.google.common.base.Strings;
3539
import com.google.common.collect.ImmutableMap;
@@ -759,6 +763,26 @@ public static void deleteTemporaryDirectory(Configuration configuration, String
759763
}
760764
}
761765

766+
/**
767+
* Deletes the GCS bucket.
768+
*/
769+
public static void deleteGcsBucket(Storage storage, String bucket) {
770+
Page<Blob> blobs = storage.list(bucket, Storage.BlobListOption.versions(true));
771+
List<BlobId> blobIds = new ArrayList<>();
772+
for (Blob blob : blobs.iterateAll()) {
773+
blobIds.add(blob.getBlobId());
774+
if (blobIds.size() == 100) {
775+
storage.delete(blobIds); // Batch delete
776+
blobIds.clear();
777+
}
778+
}
779+
if (!blobIds.isEmpty()) {
780+
storage.delete(blobIds);
781+
}
782+
storage.delete(bucket);
783+
LOG.debug("Deleted GCS bucket '{}'.", bucket);
784+
}
785+
762786
public static String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
763787
String partitionFromDate,
764788
String partitionToDate) {

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.cloud.storage.Storage;
3434
import com.google.cloud.storage.StorageException;
3535
import com.google.cloud.storage.StorageOptions;
36+
import com.google.common.collect.ImmutableMap;
3637
import com.google.gson.reflect.TypeToken;
3738
import io.cdap.plugin.gcp.gcs.GCSPath;
3839
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
@@ -284,8 +285,12 @@ public static void createBucket(Storage storage, String bucket, @Nullable String
284285
if (cmekKeyName != null) {
285286
builder.setDefaultKmsKeyName(cmekKeyName.toString());
286287
}
288+
// Add label to indicate bucket is created by cdap
289+
builder.setLabels(
290+
new ImmutableMap.Builder<String, String>().put("created_by", "cdap").build());
287291
storage.create(builder.build());
288292
}
293+
289294
/**
290295
* Formats a string as a component of a Fully-Qualified Name (FQN).
291296
*

0 commit comments

Comments
 (0)