Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
Expand Down Expand Up @@ -88,13 +86,18 @@ public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, S
@Override
public final void prepareRun(BatchSinkContext context) throws Exception {
prepareRunValidation(context);

FailureCollector collector = context.getFailureCollector();
Credentials credentials = null;
AbstractBigQuerySinkConfig config = getConfig();
String serviceAccount = config.getServiceAccount();
Credentials credentials = serviceAccount == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
try {
credentials = BigQuerySinkUtils.getCredentials(config.getConnection());
} catch (Exception e) {
String errorReason = "Unable to load service account credentials: ";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
String project = config.getProject();
FailureCollector collector = context.getFailureCollector();
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
baseConfiguration = getBaseConfiguration(cmekKeyName);
Expand Down Expand Up @@ -143,17 +146,13 @@ public final void prepareRun(BatchSinkContext context) throws Exception {

@Override
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
String gcsPath;
String bucket = getConfig().getBucket();
if (bucket == null) {
gcsPath = String.format("gs://%s", runUUID);
} else {
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
}
try {
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
Credentials credentials = BigQuerySinkUtils.getCredentials(getConfig().getConnection());
Storage storage = GCPUtils.getStorage(getConfig().getProject(), credentials);
BigQuerySinkUtils.cleanupGcsBucket(baseConfiguration, runUUID.toString(),
getConfig().getBucket(), storage);
} catch (IOException e) {
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cdap.plugin.gcp.bigquery.sink;

import com.google.api.gax.paging.Page;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
Expand All @@ -29,9 +31,11 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import io.cdap.cdap.api.data.schema.Schema;
Expand All @@ -44,6 +48,7 @@
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema;
Expand All @@ -54,6 +59,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Type;
Expand All @@ -78,8 +85,12 @@
*/
public final class BigQuerySinkUtils {

private static final Logger LOG = LoggerFactory.getLogger(BigQuerySinkUtils.class);
public static final String GS_PATH_FORMAT = "gs://%s/%s";
private static final String TEMPORARY_BUCKET_FORMAT = GS_PATH_FORMAT + "/input/%s-%s";
private static final String BQ_TEMP_BUCKET_NAME_PREFIX = "bq-sink-bucket-";
private static final String BQ_TEMP_BUCKET_NAME_TEMPLATE = BQ_TEMP_BUCKET_NAME_PREFIX + "%s";
private static final String BQ_TEMP_BUCKET_PATH_TEMPLATE = "gs://" + BQ_TEMP_BUCKET_NAME_TEMPLATE;
private static final String DATETIME = "DATETIME";
private static final String RECORD = "RECORD";
private static final String JSON = "JSON";
Expand Down Expand Up @@ -270,7 +281,7 @@ public static String configureBucket(Configuration baseConfiguration, @Nullable
boolean deleteBucket = false;
// If the bucket is null, assign the run ID as the bucket name and mark the bucket for deletion.
if (bucket == null) {
bucket = runId;
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);
deleteBucket = true;
}
return configureBucket(baseConfiguration, bucket, runId, deleteBucket);
Expand Down Expand Up @@ -1004,4 +1015,57 @@ private static void getJsonStringFieldsFromBQSchema(FieldList fieldList,
path.remove(path.size() - 1);
}
}

/**
* Deletes temporary GCS directory.
*
* @param configuration Hadoop Configuration.
* @param bucket the bucket name
* @param runId the run ID
*/
private static void deleteGcsTemporaryDirectory(Configuration configuration,
@Nullable String bucket, String runId) {
String gcsPath;
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
if (bucket == null) {
gcsPath = String.format(BQ_TEMP_BUCKET_PATH_TEMPLATE, runId);
} else {
gcsPath = String.format(GS_PATH_FORMAT, bucket, runId);
}

try {
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
} catch (Exception e) {
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
}
}

/**
* Returns the serviceAccountCredentials if present in the config, otherwise null.
*/
@Nullable
public static Credentials getCredentials(BigQueryConnectorConfig config) throws IOException {
return config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
config.isServiceAccountFilePath());
}

/**
* Cleanup temporary GCS bucket if created.
*/
public static void cleanupGcsBucket(Configuration configuration, String runId,
@Nullable String bucket, Storage storage) {
if (!Strings.isNullOrEmpty(bucket)) {
// Only need to delete the bucket if it was created for this run
deleteGcsTemporaryDirectory(configuration, bucket, runId);
return;
}
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);

try {
BigQueryUtil.deleteGcsBucket(storage, bucket);
} catch (Exception e) {
LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.util.List;
Expand Down Expand Up @@ -143,7 +144,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
try {
credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
String errorReason = "Unable to load service account credentials: ";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
Expand Down Expand Up @@ -178,7 +179,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
dataset, config.getBucket());

// Configure GCS Bucket to use
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);;
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
bucketPath, cmekKeyName);

Expand Down Expand Up @@ -240,7 +241,13 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<

@Override
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
BigQuerySourceUtils.deleteGcsTemporaryDirectory(configuration, config.getBucket(), bucketPath);
try {
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
BigQuerySourceUtils.cleanupGcsBucket(configuration, bucketPath, config.getBucket(), storage);
} catch (IOException e) {
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
}
BigQuerySourceUtils.deleteBigQueryTemporaryTable(configuration, config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
Expand Down Expand Up @@ -174,15 +175,19 @@ public static String getTemporaryGcsPath(String bucket, String pathPrefix, Strin
* @param configuration Hadoop Configuration.
* @param config BigQuery source configuration.
*/
public static void deleteBigQueryTemporaryTable(Configuration configuration, BigQuerySourceConfig config) {
public static void deleteBigQueryTemporaryTable(Configuration configuration,
BigQuerySourceConfig config) {
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
try {
Credentials credentials = getCredentials(config.getConnection());
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials,
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable));
LOG.debug("Deleted temporary table '{}'", temporaryTable);
} catch (IOException e) {
LOG.error("Failed to load service account credentials: {}", e.getMessage(), e);
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
} catch (Exception e) {
LOG.warn("Failed to delete temporary BQ table: '{}': {}", temporaryTable, e.getMessage(), e);
}
}

Expand All @@ -194,8 +199,7 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
* @param runId the run ID
*/
public static void deleteGcsTemporaryDirectory(Configuration configuration,
String bucket,
String runId) {
@Nullable String bucket, String runId) {
String gcsPath;
// If the bucket was created for this run, build temp path name using the bucket path and delete the entire bucket.
if (bucket == null) {
Expand All @@ -206,8 +210,27 @@ public static void deleteGcsTemporaryDirectory(Configuration configuration,

try {
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
} catch (IOException e) {
LOG.error("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
} catch (Exception e) {
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
}
}

/**
* Cleanup temporary GCS bucket if created.
*/
public static void cleanupGcsBucket(Configuration configuration, String runId,
@Nullable String bucket, Storage storage) {
if (!Strings.isNullOrEmpty(bucket)) {
// Only need to delete the bucket if it was created for this run
deleteGcsTemporaryDirectory(configuration, bucket, runId);
return;
}
bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, runId);

try {
BigQueryUtil.deleteGcsBucket(storage, bucket);
} catch (Exception e) {
LOG.warn("Failed to delete GCS bucket '{}': {}", bucket, e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,17 @@ public void prepareRun(SQLEngineContext context) throws Exception {
public void onRunFinish(boolean succeeded, SQLEngineContext context) {
super.onRunFinish(succeeded, context);

String gcsPath;
// If the bucket was created for this run, we should delete it.
// Otherwise, just clean the directory within the provided bucket.
if (sqlEngineConfig.getBucket() == null) {
gcsPath = String.format("gs://%s", bucket);
} else {
gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId);
}
try {
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
String serviceAccount = sqlEngineConfig.getServiceAccount();
Credentials credentials = serviceAccount == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
sqlEngineConfig.isServiceAccountFilePath());
Storage storage = GCPUtils.getStorage(sqlEngineConfig.getProject(), credentials);
BigQuerySinkUtils.cleanupGcsBucket(configuration, runId, sqlEngineConfig.getBucket(), storage);
} catch (IOException e) {
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
LOG.warn("Failed to load service account credentials: {}", e.getMessage(), e);
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.cdap.plugin.gcp.bigquery.util;

import com.google.api.client.googleapis.media.MediaHttpUploader;
import com.google.api.gax.paging.Page;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
Expand All @@ -30,6 +31,9 @@
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -713,6 +717,26 @@ public static void deleteTemporaryDirectory(Configuration configuration, String
}
}

/**
* Deletes the GCS bucket.
*/
public static void deleteGcsBucket(Storage storage, String bucket) {
Page<Blob> blobs = storage.list(bucket, Storage.BlobListOption.versions(true));
List<BlobId> blobIds = new ArrayList<>();
for (Blob blob : blobs.iterateAll()) {
blobIds.add(blob.getBlobId());
if (blobIds.size() == 100) {
storage.delete(blobIds); // Batch delete
blobIds.clear();
}
}
if (!blobIds.isEmpty()) {
storage.delete(blobIds);
}
storage.delete(bucket);
LOG.debug("Deleted GCS bucket '{}'.", bucket);
}

public static String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
String partitionFromDate,
String partitionToDate) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableMap;
import com.google.gson.reflect.TypeToken;
import io.cdap.plugin.gcp.gcs.GCSPath;
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
Expand Down Expand Up @@ -304,8 +305,12 @@ public static void createBucket(Storage storage, String bucket, @Nullable String
if (cmekKeyName != null) {
builder.setDefaultKmsKeyName(cmekKeyName.toString());
}
// Add label to indicate bucket is created by cdap
builder.setLabels(
new ImmutableMap.Builder<String, String>().put("created_by", "cdap").build());
storage.create(builder.build());
}

/**
* Formats a string as a component of a Fully-Qualified Name (FQN).
*
Expand Down
Loading
Loading