Skip to content

Commit 9acfa88

Browse files
Add support for cleanup of temp gcs buckets after pipeline completion
1 parent d991ce7 commit 9acfa88

File tree

6 files changed

+147
-44
lines changed

6 files changed

+147
-44
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,18 @@ public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, S
8888
@Override
8989
public final void prepareRun(BatchSinkContext context) throws Exception {
9090
prepareRunValidation(context);
91-
91+
FailureCollector collector = context.getFailureCollector();
92+
Credentials credentials = null;
9293
AbstractBigQuerySinkConfig config = getConfig();
93-
String serviceAccount = config.getServiceAccount();
94-
Credentials credentials = serviceAccount == null ?
95-
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
94+
try {
95+
credentials = BigQuerySinkUtils.getCredentials(config.getConnection());
96+
} catch (Exception e) {
97+
String errorReason = "Unable to load service account credentials: ";
98+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
99+
.withStacktrace(e.getStackTrace());
100+
collector.getOrThrowException();
101+
}
96102
String project = config.getProject();
97-
FailureCollector collector = context.getFailureCollector();
98103
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
99104
collector.getOrThrowException();
100105
baseConfiguration = getBaseConfiguration(cmekKeyName);
@@ -143,17 +148,14 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
143148

144149
@Override
145150
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
146-
String gcsPath;
147-
String bucket = getConfig().getBucket();
148-
if (bucket == null) {
149-
gcsPath = String.format("gs://%s", runUUID);
150-
} else {
151-
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
152-
}
151+
BigQuerySinkUtils.deleteGcsTemporaryDirectory(baseConfiguration, getConfig().getBucket(),
152+
runUUID.toString());
153153
try {
154-
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
155-
} catch (IOException e) {
156-
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
154+
Credentials credentials = BigQuerySinkUtils.getCredentials(getConfig().getConnection());
155+
Storage storage = GCPUtils.getStorage(getConfig().getProject(), credentials);
156+
BigQuerySinkUtils.deleteGcsBucket(runUUID.toString(), getConfig().getBucket(), storage);
157+
} catch (Exception e) {
158+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
157159
}
158160
}
159161

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.cdap.plugin.gcp.bigquery.sink;
1818

19+
import com.google.api.gax.paging.Page;
20+
import com.google.auth.Credentials;
1921
import com.google.cloud.bigquery.BigQuery;
2022
import com.google.cloud.bigquery.BigQueryException;
2123
import com.google.cloud.bigquery.Dataset;
@@ -29,9 +31,11 @@
2931
import com.google.cloud.bigquery.TableId;
3032
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
3133
import com.google.cloud.kms.v1.CryptoKeyName;
34+
import com.google.cloud.storage.Blob;
3235
import com.google.cloud.storage.Bucket;
3336
import com.google.cloud.storage.Storage;
3437
import com.google.cloud.storage.StorageException;
38+
import com.google.common.base.Strings;
3539
import com.google.common.reflect.TypeToken;
3640
import com.google.gson.Gson;
3741
import io.cdap.cdap.api.data.schema.Schema;
@@ -44,6 +48,7 @@
4448
import io.cdap.cdap.etl.api.validation.ValidationFailure;
4549
import io.cdap.plugin.common.Asset;
4650
import io.cdap.plugin.common.LineageRecorder;
51+
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
4752
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration;
4853
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
4954
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema;
@@ -54,6 +59,8 @@
5459
import org.apache.hadoop.conf.Configuration;
5560
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
5661
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
62+
import org.slf4j.Logger;
63+
import org.slf4j.LoggerFactory;
5764

5865
import java.io.IOException;
5966
import java.lang.reflect.Type;
@@ -78,8 +85,12 @@
7885
*/
7986
public final class BigQuerySinkUtils {
8087

88+
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySinkUtils.class);
8189
public static final String GS_PATH_FORMAT = "gs://%s/%s";
8290
private static final String TEMPORARY_BUCKET_FORMAT = GS_PATH_FORMAT + "/input/%s-%s";
91+
private static final String BQ_TEMP_BUCKET_NAME_PREFIX = "bq-sink-bucket-";
92+
private static final String BQ_TEMP_BUCKET_NAME_TEMPLATE = BQ_TEMP_BUCKET_NAME_PREFIX + "%s";
93+
private static final String BQ_TEMP_BUCKET_PATH_TEMPLATE = "gs://" + BQ_TEMP_BUCKET_NAME_TEMPLATE;
8394
private static final String DATETIME = "DATETIME";
8495
private static final String RECORD = "RECORD";
8596
private static final String JSON = "JSON";
@@ -270,7 +281,7 @@ public static String configureBucket(Configuration baseConfiguration, @Nullable
270281
boolean deleteBucket = false;
271282
// If the bucket is null, assign the run ID as the bucket name and mark the bucket for deletion.
272283
if (bucket == null) {
273-
bucket = runId;
284+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
274285
deleteBucket = true;
275286
}
276287
return configureBucket(baseConfiguration, bucket, runId, deleteBucket);
@@ -1004,4 +1015,60 @@ private static void getJsonStringFieldsFromBQSchema(FieldList fieldList,
10041015
path.remove(path.size() - 1);
10051016
}
10061017
}
1018+
1019+
/**
1020+
* Deletes temporary GCS directory.
1021+
*
1022+
* @param configuration Hadoop Configuration.
1023+
* @param bucket the bucket name
1024+
* @param runId the run ID
1025+
*/
1026+
public static void deleteGcsTemporaryDirectory(Configuration configuration,
1027+
@Nullable String bucket, String runId) {
1028+
String gcsPath;
1029+
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
1030+
if (bucket == null) {
1031+
gcsPath = String.format(BQ_TEMP_BUCKET_PATH_TEMPLATE, runId);
1032+
} else {
1033+
gcsPath = String.format(GS_PATH_FORMAT, bucket, runId);
1034+
}
1035+
1036+
try {
1037+
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
1038+
} catch (Exception e) {
1039+
LOG.error("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
1040+
}
1041+
}
1042+
1043+
/**
1044+
* Returns the serviceAccountCredentials if present in the config, otherwise null.
1045+
*/
1046+
@Nullable
1047+
public static Credentials getCredentials(BigQueryConnectorConfig config) throws IOException {
1048+
return config.getServiceAccount() == null ?
1049+
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
1050+
config.isServiceAccountFilePath());
1051+
}
1052+
1053+
/**
1054+
* Deletes temporary GCS bucket if created.
1055+
*/
1056+
public static void deleteGcsBucket(String runId, @Nullable String bucket, Storage storage) {
1057+
if (!Strings.isNullOrEmpty(bucket)) {
1058+
// Only need to delete the bucket if it was created for this run
1059+
return;
1060+
}
1061+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
1062+
try {
1063+
// Delete all objects in the bucket
1064+
Page<Blob> blobs = storage.list(bucket, Storage.BlobListOption.versions(true));
1065+
for (Blob blob : blobs.iterateAll()) {
1066+
storage.delete(blob.getBlobId());
1067+
}
1068+
// Delete the bucket
1069+
storage.delete(bucket);
1070+
} catch (Exception e) {
1071+
LOG.error("Failed to delete temporary gcs bucket '{}': {}", bucket, e.getMessage());
1072+
}
1073+
}
10071074
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
143143
try {
144144
credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
145145
} catch (Exception e) {
146-
String errorReason = "Unable to load service account credentials.";
146+
String errorReason = "Unable to load service account credentials: ";
147147
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
148148
.withStacktrace(e.getStackTrace());
149149
collector.getOrThrowException();
@@ -178,7 +178,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
178178
dataset, config.getBucket());
179179

180180
// Configure GCS Bucket to use
181-
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);;
181+
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
182182
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
183183
bucketPath, cmekKeyName);
184184

@@ -241,6 +241,7 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
241241
@Override
242242
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
243243
BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, config.getBucket(), bucketPath);
244+
BigQuerySourceUtils.deleteGcsBucket(bucketPath, config);
244245
BigQuerySourceUtils.deleteBigQueryTemporaryTable(configuration, config);
245246
}
246247

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616

1717
package io.cdap.plugin.gcp.bigquery.source;
1818

19+
import com.google.api.gax.paging.Page;
1920
import com.google.auth.Credentials;
2021
import com.google.cloud.bigquery.BigQuery;
2122
import com.google.cloud.bigquery.Dataset;
2223
import com.google.cloud.bigquery.DatasetId;
2324
import com.google.cloud.bigquery.TableId;
2425
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
2526
import com.google.cloud.kms.v1.CryptoKeyName;
27+
import com.google.cloud.storage.Blob;
2628
import com.google.cloud.storage.Storage;
2729
import com.google.cloud.storage.StorageException;
30+
import com.google.common.base.Strings;
2831
import io.cdap.cdap.api.exception.ErrorCategory;
2932
import io.cdap.cdap.api.exception.ErrorCodeType;
3033
import io.cdap.cdap.api.exception.ErrorType;
@@ -174,15 +177,19 @@ public static String getTemporaryGcsPath(String bucket, String pathPrefix, Strin
174177
* @param configuration Hadoop Configuration.
175178
* @param config BigQuery source configuration.
176179
*/
177-
public static void deleteBigQueryTemporaryTable(Configuration configuration, BigQuerySourceConfig config) {
180+
public static void deleteBigQueryTemporaryTable(Configuration configuration,
181+
BigQuerySourceConfig config) {
178182
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
179183
try {
180184
Credentials credentials = getCredentials(config.getConnection());
181-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
185+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials,
186+
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
182187
bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable));
183188
LOG.debug("Deleted temporary table '{}'", temporaryTable);
184189
} catch (IOException e) {
185-
LOG.error("Failed to load service account credentials: {}", e.getMessage(), e);
190+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
191+
} catch (Exception e) {
192+
LOG.warn("Failed to delete temporary BQ table: '{}': {}", temporaryTable, e.getMessage(), e);
186193
}
187194
}
188195

@@ -194,8 +201,7 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
194201
* @param runId the run ID
195202
*/
196203
public static void deleteGcsTemporaryDirectory(Configuration configuration,
197-
String bucket,
198-
String runId) {
204+
@Nullable String bucket, String runId) {
199205
String gcsPath;
200206
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
201207
if (bucket == null) {
@@ -206,8 +212,33 @@ public static void deleteGcsTemporaryDirectory(Configuration configuration,
206212

207213
try {
208214
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
209-
} catch (IOException e) {
210-
LOG.error("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
215+
} catch (Exception e) {
216+
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
217+
}
218+
}
219+
220+
/**
221+
* Deletes temporary GCS bucket if created.
222+
*/
223+
public static void deleteGcsBucket(String runId, BigQuerySourceConfig config) {
224+
String bucket = config.getBucket();
225+
if (!Strings.isNullOrEmpty(bucket)) {
226+
// Only need to delete the bucket if it was created for this run
227+
return;
228+
}
229+
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
230+
try {
231+
Credentials credentials = getCredentials(config.getConnection());
232+
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
233+
// Delete all objects in the bucket
234+
Page<Blob> blobs = storage.list(bucket, Storage.BlobListOption.versions(true));
235+
for (Blob blob : blobs.iterateAll()) {
236+
storage.delete(blob.getBlobId());
237+
}
238+
// Delete the bucket
239+
storage.delete(bucket);
240+
} catch (Exception e) {
241+
LOG.warn("Failed to delete temporary gcs bucket '{}': {}", bucket, e.getMessage());
211242
}
212243
}
213244
}

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,19 @@ public void prepareRun(SQLEngineContext context) throws Exception {
190190
public void onRunFinish(boolean succeeded, SQLEngineContext context) {
191191
super.onRunFinish(succeeded, context);
192192

193-
String gcsPath;
194193
// If the bucket was created for this run, we should delete it.
195194
// Otherwise, just clean the directory within the provided bucket.
196-
if (sqlEngineConfig.getBucket() == null) {
197-
gcsPath = String.format("gs://%s", bucket);
198-
} else {
199-
gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId);
200-
}
201-
try {
202-
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
195+
BigQuerySinkUtils.deleteGcsTemporaryDirectory(configuration, sqlEngineConfig.getBucket(),
196+
runId);
197+
try{
198+
String serviceAccount = sqlEngineConfig.getServiceAccount();
199+
Credentials credentials = serviceAccount == null ?
200+
null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
201+
sqlEngineConfig.isServiceAccountFilePath());
202+
Storage storage = GCPUtils.getStorage(sqlEngineConfig.getProject(), credentials);
203+
BigQuerySinkUtils.deleteGcsBucket(runId, sqlEngineConfig.getBucket(), storage);
203204
} catch (IOException e) {
204-
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
205+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
205206
}
206207
}
207208

src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@
8181
import io.cdap.plugin.gcp.gcs.StorageClient;
8282
import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink;
8383
import org.apache.hadoop.conf.Configuration;
84-
import org.apache.hadoop.fs.FileSystem;
85-
import org.apache.hadoop.fs.Path;
8684
import org.apache.hadoop.io.NullWritable;
8785
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
8886
import org.slf4j.Logger;
@@ -249,16 +247,19 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
249247
return;
250248
}
251249

252-
Path gcsPath = new Path(DataplexConstants.STORAGE_BUCKET_PATH_PREFIX + runUUID);
250+
BigQuerySinkUtils.deleteGcsTemporaryDirectory(baseConfiguration, null, runUUID.toString());
253251
try {
254-
FileSystem fs = gcsPath.getFileSystem(baseConfiguration);
255-
if (fs.exists(gcsPath)) {
256-
fs.delete(gcsPath, true);
257-
LOG.debug("Deleted temporary directory '{}'", gcsPath);
258-
}
259-
emitMetricsForBigQueryDataset(succeeded, context);
252+
String serviceAccount = config.getServiceAccount();
253+
Credentials credentials = serviceAccount == null ? null
254+
: GCPUtils.loadServiceAccountCredentials(serviceAccount,
255+
config.isServiceAccountFilePath());
256+
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
257+
BigQuerySinkUtils.deleteGcsBucket(runUUID.toString(), null, storage);
260258
} catch (IOException e) {
261-
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
259+
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
260+
}
261+
try {
262+
emitMetricsForBigQueryDataset(succeeded, context);
262263
} catch (Exception exception) {
263264
LOG.warn("Exception while trying to emit metric. No metric will be emitted for the number of affected rows.",
264265
exception);

0 commit comments

Comments
 (0)