Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
Expand All @@ -43,6 +46,7 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,18 +94,32 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
Credentials credentials = serviceAccount == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
String project = config.getProject();
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
FailureCollector collector = context.getFailureCollector();
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
baseConfiguration = getBaseConfiguration(cmekKeyName);

// Get required dataset ID and dataset instance (if it exists)
DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset());
Dataset dataset = bigQuery.getDataset(datasetId);
Dataset dataset;
try {
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
dataset = bigQuery.getDataset(datasetId);
} catch (Exception e) {
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.WRITING));
throw ex == null ? e : ex;
}

// Get the required bucket name and bucket instance (if it exists)
Storage storage = GCPUtils.getStorage(project, credentials);
Storage storage;
try {
storage = GCPUtils.getStorage(project, credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.WRITING));
throw ex == null ? e : ex;
}
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
dataset, config.getBucket());
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullabl
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.BQ_SUPPORTED_DOC_URL);
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
GCPUtils.BQ_SUPPORTED_DOC_URL, e);
}
Expand Down Expand Up @@ -249,8 +251,10 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.GCS_SUPPORTED_DOC_URL);
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
GCPUtils.GCS_SUPPORTED_DOC_URL, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand All @@ -48,7 +49,9 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
Expand All @@ -60,6 +63,7 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
Expand Down Expand Up @@ -147,9 +151,16 @@ public void prepareRun(BatchSourceContext context) throws Exception {
collector.getOrThrowException();
}

BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
BigQuery bigQuery;
Dataset dataset;
try {
bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
} catch (Exception e) {
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}

// Get Configuration for this run
bucketPath = UUID.randomUUID().toString();
Expand All @@ -169,10 +180,18 @@ public void prepareRun(BatchSourceContext context) throws Exception {
dataset, config.getBucket());

// Configure GCS Bucket to use
Storage storage;
try {
storage = GCPUtils.getStorage(config.getProject(), credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
String bucket = null;
try {
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath,
cmekKeyName);
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
bucketPath, cmekKeyName);
} catch (Exception e) {
String errorReason = "Failed to create bucket.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* A custom ErrorDetailsProvider for GCP plugins.
*/
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";

/**
* Get a ProgramFailureException with the given error
Expand Down Expand Up @@ -71,12 +72,12 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";

String errorMessage = e.getMessage();
String externalDocumentationLink = null;
Expand All @@ -95,7 +96,8 @@ private ProgramFailureException getProgramFailureException(HttpResponseException
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage),
errorReason, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage),
pair.getErrorType(), true, ErrorCodeType.HTTP, statusCode.toString(),
externalDocumentationLink, e);
}
Expand All @@ -122,11 +124,12 @@ private String getErrorMessage(GoogleJsonResponseException exception) {
* @param e The IllegalArgumentException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(IllegalArgumentException e,
ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e);
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage), ErrorType.USER, false, e);
}

/**
Expand All @@ -136,11 +139,12 @@ private ProgramFailureException getProgramFailureException(IllegalArgumentExcept
* @param e The IllegalStateException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(IllegalStateException e,
ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e);
}

/**
Expand Down
18 changes: 14 additions & 4 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
Expand Down Expand Up @@ -138,10 +141,14 @@ public void prepareRun(BatchSinkContext context) throws Exception {
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
Bucket bucket;
String location = null;
try {
Expand All @@ -153,6 +160,9 @@ public void prepareRun(BatchSinkContext context) throws Exception {
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
}
} catch (StorageException e) {
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
Expand Down
20 changes: 15 additions & 5 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.api.plugin.PluginProperties;
Expand All @@ -41,7 +42,9 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
Expand Down Expand Up @@ -126,7 +129,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
}

@Override
public void prepareRun(BatchSinkContext context) throws IOException, InstantiationException {
public void prepareRun(BatchSinkContext context) throws Exception {
FailureCollector collector = context.getFailureCollector();
config.validate(collector, context.getArguments().asMap());
collector.getOrThrowException();
Expand Down Expand Up @@ -156,15 +159,22 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
try {
if (storage.get(bucketName) == null) {
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
}
} catch (StorageException e) {
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
Expand Down Expand Up @@ -118,7 +121,14 @@ public void prepareRun(BatchSourceContext context) throws Exception {
collector.getOrThrowException();
}

Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
String location = null;
try {
// Get location of the source for lineage
Expand Down
Loading