Skip to content

Commit c13bece

Browse files
Merge pull request #1552 from data-integrations/PLUGIN-1844
[PLUGIN-1844] Add label and use storage client for bucket cleanup.
2 parents d991ce7 + 92da91c commit c13bece

File tree

10 files changed

+170
-47
lines changed

10 files changed

+170
-47
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,10 @@
3131
import io.cdap.cdap.api.data.schema.Schema;
3232
import io.cdap.cdap.api.dataset.lib.KeyValue;
3333
import io.cdap.cdap.api.exception.ErrorType;
34-
import io.cdap.cdap.api.exception.ProgramFailureException;
3534
import io.cdap.cdap.etl.api.Emitter;
3635
import io.cdap.cdap.etl.api.FailureCollector;
3736
import io.cdap.cdap.etl.api.batch.BatchSink;
3837
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
39-
import io.cdap.cdap.etl.api.exception.ErrorContext;
4038
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
4139
import io.cdap.plugin.common.Asset;
4240
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
@@ -88,13 +86,18 @@ public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, S
8886
@Override
8987
public final void prepareRun(BatchSinkContext context) throws Exception {
9088
prepareRunValidation(context);
91-
89+
FailureCollector collector = context.getFailureCollector();
90+
Credentials credentials = null;
9291
AbstractBigQuerySinkConfig config = getConfig();
93-
String serviceAccount = config.getServiceAccount();
94-
Credentials credentials = serviceAccount == null ?
95-
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
92+
try {
93+
credentials = BigQuerySinkUtils.getCredentials(config.getConnection());
94+
} catch (Exception e) {
95+
String errorReason = "Unable to load service account credentials: ";
96+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
97+
.withStacktrace(e.getStackTrace());
98+
collector.getOrThrowException();
99+
}
96100
String project = config.getProject();
97-
FailureCollector collector = context.getFailureCollector();
98101
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
99102
collector.getOrThrowException();
100103
baseConfiguration = getBaseConfiguration(cmekKeyName);
@@ -143,17 +146,13 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
143146

144147
@Override
145148
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
146-
String gcsPath;
147-
String bucket = getConfig().getBucket();
148-
if (bucket == null) {
149-
gcsPath = String.format("gs://%s", runUUID);
150-
} else {
151-
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
152-
}
153149
try {
154-
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
150+
Credentials credentials = BigQuerySinkUtils.getCredentials(getConfig().getConnection());
151+
Storage storage = GCPUtils.getStorage(getConfig().getProject(), credentials);
152+
BigQuerySinkUtils.cleanupGcsBucket(baseConfiguration, runUUID.toString(),
153+
getConfig().getBucket(), storage);
155154
} catch (IOException e) {
156-
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
155+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
157156
}
158157
}
159158

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.cdap.plugin.gcp.bigquery.sink;
1818

19+
import com.google.api.gax.paging.Page;
20+
import com.google.auth.Credentials;
1921
import com.google.cloud.bigquery.BigQuery;
2022
import com.google.cloud.bigquery.BigQueryException;
2123
import com.google.cloud.bigquery.Dataset;
@@ -29,9 +31,11 @@
2931
import com.google.cloud.bigquery.TableId;
3032
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
3133
import com.google.cloud.kms.v1.CryptoKeyName;
34+
import com.google.cloud.storage.Blob;
3235
import com.google.cloud.storage.Bucket;
3336
import com.google.cloud.storage.Storage;
3437
import com.google.cloud.storage.StorageException;
38+
import com.google.common.base.Strings;
3539
import com.google.common.reflect.TypeToken;
3640
import com.google.gson.Gson;
3741
import io.cdap.cdap.api.data.schema.Schema;
@@ -44,6 +48,7 @@
4448
import io.cdap.cdap.etl.api.validation.ValidationFailure;
4549
import io.cdap.plugin.common.Asset;
4650
import io.cdap.plugin.common.LineageRecorder;
51+
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
4752
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration;
4853
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
4954
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema;
@@ -54,6 +59,8 @@
5459
import org.apache.hadoop.conf.Configuration;
5560
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
5661
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
62+
import org.slf4j.Logger;
63+
import org.slf4j.LoggerFactory;
5764

5865
import java.io.IOException;
5966
import java.lang.reflect.Type;
@@ -78,8 +85,12 @@
7885
*/
7986
public final class BigQuerySinkUtils {
8087

88+
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySinkUtils.class);
8189
public static final String GS_PATH_FORMAT = "gs://%s/%s";
8290
private static final String TEMPORARY_BUCKET_FORMAT = GS_PATH_FORMAT + "/input/%s-%s";
91+
private static final String BQ_TEMP_BUCKET_NAME_PREFIX = "bq-sink-bucket-";
92+
private static final String BQ_TEMP_BUCKET_NAME_TEMPLATE = BQ_TEMP_BUCKET_NAME_PREFIX + "%s";
93+
private static final String BQ_TEMP_BUCKET_PATH_TEMPLATE = "gs://" + BQ_TEMP_BUCKET_NAME_TEMPLATE;
8394
private static final String DATETIME = "DATETIME";
8495
private static final String RECORD = "RECORD";
8596
private static final String JSON = "JSON";
@@ -270,7 +281,7 @@ public static String configureBucket(Configuration baseConfiguration, @Nullable
270281
boolean deleteBucket = false;
271282
// If the bucket is null, assign the run ID as the bucket name and mark the bucket for deletion.
272283
if (bucket == null) {
273-
bucket = runId;
284+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
274285
deleteBucket = true;
275286
}
276287
return configureBucket(baseConfiguration, bucket, runId, deleteBucket);
@@ -1004,4 +1015,57 @@ private static void getJsonStringFieldsFromBQSchema(FieldList fieldList,
10041015
path.remove(path.size() - 1);
10051016
}
10061017
}
1018+
1019+
/**
1020+
* Deletes temporary GCS directory.
1021+
*
1022+
* @param configuration Hadoop Configuration.
1023+
* @param bucket the bucket name
1024+
* @param runId the run ID
1025+
*/
1026+
private static void deleteGcsTemporaryDirectory(Configuration configuration,
1027+
@Nullable String bucket, String runId) {
1028+
String gcsPath;
1029+
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
1030+
if (bucket == null) {
1031+
gcsPath = String.format(BQ_TEMP_BUCKET_PATH_TEMPLATE, runId);
1032+
} else {
1033+
gcsPath = String.format(GS_PATH_FORMAT, bucket, runId);
1034+
}
1035+
1036+
try {
1037+
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
1038+
} catch (Exception e) {
1039+
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
1040+
}
1041+
}
1042+
1043+
/**
1044+
* Returns the serviceAccountCredentials if present in the config, otherwise null.
1045+
*/
1046+
@Nullable
1047+
public static Credentials getCredentials(BigQueryConnectorConfig config) throws IOException {
1048+
return config.getServiceAccount() == null ?
1049+
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
1050+
config.isServiceAccountFilePath());
1051+
}
1052+
1053+
/**
1054+
* Cleanup temporary GCS bucket if created.
1055+
*/
1056+
public static void cleanupGcsBucket(Configuration configuration, String runId,
1057+
@Nullable String bucket, Storage storage) {
1058+
if (!Strings.isNullOrEmpty(bucket)) {
1059+
// Only need to delete the bucket if it was created for this run
1060+
deleteGcsTemporaryDirectory(configuration, bucket, runId);
1061+
return;
1062+
}
1063+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
1064+
1065+
try {
1066+
BigQueryUtil.deleteGcsBucket(storage, bucket);
1067+
} catch (Exception e) {
1068+
LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e);
1069+
}
1070+
}
10071071
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.slf4j.Logger;
6969
import org.slf4j.LoggerFactory;
7070

71+
import java.io.IOException;
7172
import java.time.DateTimeException;
7273
import java.time.LocalDate;
7374
import java.util.List;
@@ -143,7 +144,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
143144
try {
144145
credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
145146
} catch (Exception e) {
146-
String errorReason = "Unable to load service account credentials.";
147+
String errorReason = "Unable to load service account credentials: ";
147148
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
148149
.withStacktrace(e.getStackTrace());
149150
collector.getOrThrowException();
@@ -178,7 +179,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
178179
dataset, config.getBucket());
179180

180181
// Configure GCS Bucket to use
181-
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);;
182+
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
182183
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
183184
bucketPath, cmekKeyName);
184185

@@ -240,7 +241,13 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
240241

241242
@Override
242243
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
243-
BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, config.getBucket(), bucketPath);
244+
try {
245+
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
246+
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
247+
BigQuerySourceUtils.cleanupGcsBucket(configuration, bucketPath, config.getBucket(), storage);
248+
} catch (IOException e) {
249+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
250+
}
244251
BigQuerySourceUtils.deleteBigQueryTemporaryTable(configuration, config);
245252
}
246253

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.kms.v1.CryptoKeyName;
2626
import com.google.cloud.storage.Storage;
2727
import com.google.cloud.storage.StorageException;
28+
import com.google.common.base.Strings;
2829
import io.cdap.cdap.api.exception.ErrorCategory;
2930
import io.cdap.cdap.api.exception.ErrorCodeType;
3031
import io.cdap.cdap.api.exception.ErrorType;
@@ -174,15 +175,19 @@ public static String getTemporaryGcsPath(String bucket, String pathPrefix, Strin
174175
* @param configuration Hadoop Configuration.
175176
* @param config BigQuery source configuration.
176177
*/
177-
public static void deleteBigQueryTemporaryTable(Configuration configuration, BigQuerySourceConfig config) {
178+
public static void deleteBigQueryTemporaryTable(Configuration configuration,
179+
BigQuerySourceConfig config) {
178180
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
179181
try {
180182
Credentials credentials = getCredentials(config.getConnection());
181-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
183+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials,
184+
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
182185
bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable));
183186
LOG.debug("Deleted temporary table '{}'", temporaryTable);
184187
} catch (IOException e) {
185-
LOG.error("Failed to load service account credentials: {}", e.getMessage(), e);
188+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
189+
} catch (Exception e) {
190+
LOG.warn("Failed to delete temporary BQ table: '{}': {}", temporaryTable, e.getMessage(), e);
186191
}
187192
}
188193

@@ -194,8 +199,7 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
194199
* @param runId the run ID
195200
*/
196201
public static void deleteGcsTemporaryDirectory(Configuration configuration,
197-
String bucket,
198-
String runId) {
202+
@Nullable String bucket, String runId) {
199203
String gcsPath;
200204
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
201205
if (bucket == null) {
@@ -206,8 +210,27 @@ public static void deleteGcsTemporaryDirectory(Configuration configuration,
206210

207211
try {
208212
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
209-
} catch (IOException e) {
210-
LOG.error("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
213+
} catch (Exception e) {
214+
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
215+
}
216+
}
217+
218+
/**
219+
* Cleanup temporary GCS bucket if created.
220+
*/
221+
public static void cleanupGcsBucket(Configuration configuration, String runId,
222+
@Nullable String bucket, Storage storage) {
223+
if (!Strings.isNullOrEmpty(bucket)) {
224+
// Only need to delete the bucket if it was created for this run
225+
deleteGcsTemporaryDirectory(configuration, bucket, runId);
226+
return;
227+
}
228+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
229+
230+
try {
231+
BigQueryUtil.deleteGcsBucket(storage, bucket);
232+
} catch (Exception e) {
233+
LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e);
211234
}
212235
}
213236
}

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,17 @@ public void prepareRun(SQLEngineContext context) throws Exception {
190190
public void onRunFinish(boolean succeeded, SQLEngineContext context) {
191191
super.onRunFinish(succeeded, context);
192192

193-
String gcsPath;
194193
// If the bucket was created for this run, we should delete it.
195194
// Otherwise, just clean the directory within the provided bucket.
196-
if (sqlEngineConfig.getBucket() == null) {
197-
gcsPath = String.format("gs://%s", bucket);
198-
} else {
199-
gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId);
200-
}
201195
try {
202-
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
196+
String serviceAccount = sqlEngineConfig.getServiceAccount();
197+
Credentials credentials = serviceAccount == null ?
198+
null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
199+
sqlEngineConfig.isServiceAccountFilePath());
200+
Storage storage = GCPUtils.getStorage(sqlEngineConfig.getProject(), credentials);
201+
BigQuerySinkUtils.cleanupGcsBucket(configuration, runId, sqlEngineConfig.getBucket(), storage);
203202
} catch (IOException e) {
204-
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
203+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
205204
}
206205
}
207206

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.gcp.bigquery.util;
1818

1919
import com.google.api.client.googleapis.media.MediaHttpUploader;
20+
import com.google.api.gax.paging.Page;
2021
import com.google.cloud.bigquery.BigQuery;
2122
import com.google.cloud.bigquery.BigQueryException;
2223
import com.google.cloud.bigquery.Dataset;
@@ -30,6 +31,9 @@
3031
import com.google.cloud.bigquery.TimePartitioning;
3132
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
3233
import com.google.cloud.kms.v1.CryptoKeyName;
34+
import com.google.cloud.storage.Blob;
35+
import com.google.cloud.storage.BlobId;
36+
import com.google.cloud.storage.Storage;
3337
import com.google.common.annotations.VisibleForTesting;
3438
import com.google.common.base.Strings;
3539
import com.google.common.collect.ImmutableMap;
@@ -713,6 +717,26 @@ public static void deleteTemporaryDirectory(Configuration configuration, String
713717
}
714718
}
715719

720+
/**
721+
* Deletes the GCS bucket.
722+
*/
723+
public static void deleteGcsBucket(Storage storage, String bucket) {
724+
Page<Blob> blobs = storage.list(bucket, Storage.BlobListOption.versions(true));
725+
List<BlobId> blobIds = new ArrayList<>();
726+
for (Blob blob : blobs.iterateAll()) {
727+
blobIds.add(blob.getBlobId());
728+
if (blobIds.size() == 100) {
729+
storage.delete(blobIds); // Batch delete
730+
blobIds.clear();
731+
}
732+
}
733+
if (!blobIds.isEmpty()) {
734+
storage.delete(blobIds);
735+
}
736+
storage.delete(bucket);
737+
LOG.debug("Deleted GCS bucket '{}'.", bucket);
738+
}
739+
716740
public static String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
717741
String partitionFromDate,
718742
String partitionToDate) {

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.cloud.storage.Storage;
3535
import com.google.cloud.storage.StorageException;
3636
import com.google.cloud.storage.StorageOptions;
37+
import com.google.common.collect.ImmutableMap;
3738
import com.google.gson.reflect.TypeToken;
3839
import io.cdap.plugin.gcp.gcs.GCSPath;
3940
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
@@ -304,8 +305,12 @@ public static void createBucket(Storage storage, String bucket, @Nullable String
304305
if (cmekKeyName != null) {
305306
builder.setDefaultKmsKeyName(cmekKeyName.toString());
306307
}
308+
// Add label to indicate bucket is created by cdap
309+
builder.setLabels(
310+
new ImmutableMap.Builder<String, String>().put("created_by", "cdap").build());
307311
storage.create(builder.build());
308312
}
313+
309314
/**
310315
* Formats a string as a component of a Fully-Qualified Name (FQN).
311316
*

0 commit comments

Comments
 (0)