causalChain = Throwables.getCausalChain(e);
+ for (Throwable t : causalChain) {
+ if (t instanceof ProgramFailureException) {
+ return (ProgramFailureException) t;
+ }
+ if (t instanceof HttpResponseException) {
+ return getProgramFailureException((HttpResponseException) t);
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java
index 0a64c52dd9..4cedc83c8e 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java
@@ -21,6 +21,10 @@
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
@@ -50,13 +54,20 @@ public AccessToken getAccessToken() {
}
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ "Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
}
}
@Override
public void refresh() throws IOException {
- getCredentials().refresh();
+ try {
+ getCredentials().refresh();
+ } catch (IOException e) {
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ "Unable to refresh service account access token.", e.getMessage(),
+ ErrorType.UNKNOWN, true, e);
+ }
}
private GoogleCredentials getCredentials() throws IOException {
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java
index 72d9dd0e69..b5ba33187d 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java
@@ -16,6 +16,8 @@
package io.cdap.plugin.gcp.gcs.sink;
+import io.cdap.cdap.api.exception.ErrorDetailsProvider;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +43,8 @@
*
* Delegated instances are created based on a supplied Output Format and Destination Table Names.
*/
-public class DelegatingGCSOutputCommitter extends OutputCommitter {
+public class DelegatingGCSOutputCommitter extends OutputCommitter implements
+ ErrorDetailsProvider {
private final TaskAttemptContext taskAttemptContext;
private boolean firstTable = true;
@@ -244,4 +247,8 @@ private String getPendingDirPath(JobID jobId) {
return String.format("%s_%s", FileOutputCommitter.PENDING_DIR_NAME, jobId);
}
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java
index 2fe4eeff03..ac559825a5 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java
@@ -17,6 +17,8 @@
package io.cdap.plugin.gcp.gcs.sink;
import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.exception.ErrorDetailsProvider;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -32,7 +34,8 @@
/**
* Output Format used to handle Schemaless Records as input.
*/
-public class DelegatingGCSOutputFormat extends OutputFormat {
+public class DelegatingGCSOutputFormat extends OutputFormat implements
+ ErrorDetailsProvider {
public static final String PARTITION_FIELD = "delegating_output_format.partition.field";
public static final String DELEGATE_CLASS = "delegating_output_format.delegate";
@@ -75,4 +78,9 @@ public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext contex
return new DelegatingGCSOutputCommitter(context);
}
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Configuration conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
+
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java
index 3bd6a13cc0..0414345801 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java
@@ -17,6 +17,8 @@
package io.cdap.plugin.gcp.gcs.sink;
import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.exception.ErrorDetailsProvider;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -31,7 +33,8 @@
*
* This Record Writer will initialize record writes and Output Committers as needed.
*/
-public class DelegatingGCSRecordWriter extends RecordWriter {
+public class DelegatingGCSRecordWriter extends
+ RecordWriter implements ErrorDetailsProvider {
private final TaskAttemptContext context;
private final String partitionField;
private final Map> delegateMap;
@@ -78,4 +81,8 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
}
}
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
index 129b6a297f..c55f5e0f57 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
@@ -121,26 +121,42 @@ public void prepareRun(BatchSinkContext context) throws Exception {
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
collector.getOrThrowException();
- return;
}
- Credentials credentials = config.connection.getServiceAccount() == null ?
- null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
+
+ Credentials credentials = null;
+ try {
+ credentials = config.connection.getServiceAccount() == null ?
+ null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
+ isServiceAccountFilePath);
+ } catch (Exception e) {
+ String errorReason = "Unable to load service account credentials.";
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
+ String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
+ String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
+ String correctiveAction = "Ensure you entered the correct bucket path and "
+ + "have permissions for it.";
Bucket bucket;
- String location;
+ String location = null;
try {
- bucket = storage.get(config.getBucket());
+ bucket = storage.get(bucketName);
+ if (bucket != null) {
+ location = bucket.getLocation();
+ } else {
+ location = config.getLocation();
+ GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
+ }
} catch (StorageException e) {
- throw new RuntimeException(
- String.format("Unable to access or create bucket %s. ", config.getBucket())
- + "Ensure you entered the correct bucket path and have permissions for it.", e);
- }
- if (bucket != null) {
- location = bucket.getLocation();
- } else {
- GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
- location = config.getLocation();
+ String errorReason = String.format(errorReasonFormat, e.getCode());
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
}
+
this.outputPath = getOutputDir(context);
// create asset for lineage
asset = Asset.builder(config.getReferenceName())
@@ -532,8 +548,15 @@ public void validateContentType(FailureCollector failureCollector) {
}
}
- public String getBucket() {
- return GCSPath.from(path).getBucket();
+ public String getBucket(FailureCollector collector) {
+ try {
+ return GCSPath.from(path).getBucket();
+ } catch (IllegalArgumentException e) {
+ collector.addFailure(e.getMessage(), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+ return null;
}
@Override
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
index 7b15839c10..77e4c86f24 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
@@ -129,33 +129,47 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
config.validate(collector, context.getArguments().asMap());
collector.getOrThrowException();
- Map baseProperties = GCPUtils.getFileSystemProperties(config.connection,
- config.getPath(), new HashMap<>());
Map argumentCopy = new HashMap<>(context.getArguments().asMap());
-
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
+
Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
if (isServiceAccountFilePath == null) {
- context.getFailureCollector().addFailure("Service account type is undefined.",
- "Must be `filePath` or `JSON`");
- context.getFailureCollector().getOrThrowException();
- return;
+ collector.addFailure("Service account type is undefined.",
+ "Must be `filePath` or `JSON`");
+ collector.getOrThrowException();
}
- Credentials credentials = config.connection.getServiceAccount() == null ?
- null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
+
+ Credentials credentials = null;
+ try {
+ credentials = config.connection.getServiceAccount() == null ?
+ null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
+ isServiceAccountFilePath);
+ } catch (Exception e) {
+ String errorReason = "Unable to load service account credentials.";
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
+ String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
+ String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
+ String correctiveAction = "Ensure you entered the correct bucket path and "
+ + "have permissions for it.";
try {
- if (storage.get(config.getBucket()) == null) {
- GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
+ if (storage.get(bucketName) == null) {
+ GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
}
} catch (StorageException e) {
- // Add more descriptive error message
- throw new RuntimeException(
- String.format("Unable to access or create bucket %s. ", config.getBucket())
- + "Ensure you entered the correct bucket path and have permissions for it.", e);
+ String errorReason = String.format(errorReasonFormat, e.getCode());
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
}
+ Map baseProperties = GCPUtils.getFileSystemProperties(config.connection,
+ config.getPath(), new HashMap<>());
if (config.getAllowFlexibleSchema()) {
//Configure MultiSink with support for flexible schemas.
configureSchemalessMultiSink(context, baseProperties, argumentCopy);
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java
index cccef18ad9..d3e2f4ba0f 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java
@@ -21,6 +21,8 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
+import io.cdap.cdap.api.exception.ErrorDetailsProvider;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.StorageClient;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +42,8 @@
/**
* OutputCommitter for GCS
*/
-public class GCSOutputCommitter extends OutputCommitter {
+public class GCSOutputCommitter extends OutputCommitter implements
+ ErrorDetailsProvider {
private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";
@@ -161,4 +164,9 @@ public boolean isRecoverySupported() {
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
delegate.recoverTask(taskContext);
}
+
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java
index 1d57e6d28e..42e2db120a 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java
@@ -1,8 +1,26 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package io.cdap.plugin.gcp.gcs.sink;
import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -49,7 +67,8 @@ public Map getOutputFormatConfiguration() {
/**
* OutputFormat for GCS Sink
*/
- public static class GCSOutputFormat extends OutputFormat {
+ public static class GCSOutputFormat extends OutputFormat
+ implements ErrorDetailsProvider {
private OutputFormat delegateFormat;
private OutputFormat getDelegateFormatInstance(Configuration configuration) throws IOException {
@@ -89,12 +108,18 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
.getOutputCommitter(taskAttemptContext);
return new GCSOutputCommitter(delegateCommitter);
}
+
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Configuration configuration) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
}
/**
* RecordWriter for GCSSink
*/
- public static class GCSRecordWriter extends RecordWriter {
+ public static class GCSRecordWriter extends RecordWriter
+ implements ErrorDetailsProvider {
private final RecordWriter originalWriter;
private long recordCount;
@@ -117,5 +142,10 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException, Int
taskAttemptContext.getConfiguration()
.setLong(String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID()), recordCount);
}
+
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java
index f4f5d6caca..8387d9b30c 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java
@@ -18,6 +18,8 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.exception.ErrorDetailsProvider;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -34,7 +36,8 @@
/**
* An OutputFormat that filters records before sending them to a delegate
*/
-public class RecordFilterOutputFormat extends OutputFormat {
+public class RecordFilterOutputFormat extends OutputFormat implements
+ ErrorDetailsProvider {
public static final String FILTER_FIELD = "record.filter.field";
public static final String PASS_VALUE = "record.filter.val";
public static final String ORIGINAL_SCHEMA = "record.original.schema";
@@ -93,10 +96,16 @@ private OutputFormat getDelegateFormat(Configuration hConf) throws IOException {
}
}
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Configuration conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
+
/**
* Filters records before writing them out using a delegate.
*/
- public static class FilterRecordWriter extends RecordWriter {
+ public static class FilterRecordWriter extends RecordWriter
+ implements ErrorDetailsProvider {
private final String filterField;
private final String passthroughValue;
private final RecordWriter delegate;
@@ -132,5 +141,10 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
delegate.close(context);
}
+
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSInputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSInputFormatProvider.java
new file mode 100644
index 0000000000..308b245fc0
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSInputFormatProvider.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.gcs.source;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.exception.ErrorDetailsProvider;
+import io.cdap.cdap.etl.api.validation.FormatContext;
+import io.cdap.cdap.etl.api.validation.InputFiles;
+import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * InputFormatProvider for GCSSource.
+ */
+public class GCSInputFormatProvider implements ValidatingInputFormat {
+ public static final String DELEGATE_INPUTFORMAT_CLASSNAME =
+ "gcssource.delegate.inputformat.classname";
+ private final ValidatingInputFormat delegate;
+
+ public GCSInputFormatProvider(ValidatingInputFormat delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void validate(FormatContext context) {
+ delegate.validate(context);
+ }
+
+ @Override
+ public @Nullable Schema getSchema(FormatContext formatContext) {
+ return delegate.getSchema(formatContext);
+ }
+
+ @Override
+ public @Nullable Schema detectSchema(FormatContext context, InputFiles inputFiles)
+ throws IOException {
+ return ValidatingInputFormat.super.detectSchema(context, inputFiles);
+ }
+
+ @Override
+ public String getInputFormatClassName() {
+ return GCSInputFormat.class.getName();
+ }
+
+ @Override
+ public Map getInputFormatConfiguration() {
+ Map inputFormatConfiguration =
+ new HashMap<>(delegate.getInputFormatConfiguration());
+ inputFormatConfiguration.put(DELEGATE_INPUTFORMAT_CLASSNAME,
+ delegate.getInputFormatClassName());
+ return inputFormatConfiguration;
+ }
+
+ /**
+ * InputFormat for GCSSource.
+ */
+ public static class GCSInputFormat extends InputFormat
+ implements ErrorDetailsProvider {
+ private InputFormat delegateFormat;
+
+ private InputFormat getDelegateFormatInstance(Configuration configuration) throws IOException {
+ if (delegateFormat != null) {
+ return delegateFormat;
+ }
+
+ String delegateClassName = configuration.get(DELEGATE_INPUTFORMAT_CLASSNAME);
+ try {
+ delegateFormat = (InputFormat) ReflectionUtils
+ .newInstance(configuration.getClassByName(delegateClassName), configuration);
+ return delegateFormat;
+ } catch (ClassNotFoundException e) {
+ throw new IOException(
+ String.format("Unable to instantiate output format for class %s", delegateClassName),
+ e);
+ }
+ }
+
+ @Override
+ public List getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ return getDelegateFormatInstance(jobContext.getConfiguration()).getSplits(jobContext);
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ RecordReader originalReader =
+ getDelegateFormatInstance(taskAttemptContext.getConfiguration())
+ .createRecordReader(inputSplit, taskAttemptContext);
+ return new GCSRecordReader(originalReader);
+ }
+
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Configuration configuration) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
+ }
+
+ /**
+ * RecordReader for GCSSource.
+ */
+ public static class GCSRecordReader extends RecordReader
+ implements ErrorDetailsProvider {
+
+ private final RecordReader originalReader;
+
+ public GCSRecordReader(RecordReader originalReader) {
+ this.originalReader = originalReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ originalReader.initialize(inputSplit, taskAttemptContext);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return originalReader.nextKeyValue();
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return originalReader.getCurrentKey();
+ }
+
+ @Override
+ public StructuredRecord getCurrentValue() throws IOException, InterruptedException {
+ return originalReader.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return originalReader.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ originalReader.close();
+ }
+
+ @Override
+ public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
+ return ExceptionUtils.getProgramFailureException(throwable);
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
index 7bfec4dd81..47533f796d 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
@@ -33,6 +33,7 @@
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
+import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
@@ -73,6 +74,12 @@ public GCSSource(GCSSourceConfig config) {
this.config = config;
}
+ @Override
+ public ValidatingInputFormat getValidatingInputFormat(PipelineConfigurer pipelineConfigurer) {
+ ValidatingInputFormat delegate = super.getValidatingInputFormat(pipelineConfigurer);
+ return new GCSInputFormatProvider(delegate);
+ }
+
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
@@ -83,25 +90,62 @@ protected String getEmptyInputFormatClassName() {
return GCSEmptyInputFormat.class.getName();
}
+ @Override
+ public ValidatingInputFormat getInputFormatForRun(BatchSourceContext context)
+ throws InstantiationException {
+ ValidatingInputFormat inputFormatForRun = super.getInputFormatForRun(context);
+ return new GCSInputFormatProvider(inputFormatForRun);
+ }
+
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
// Get location of the source for lineage
- String location;
- String bucketName = GCSPath.from(config.getPath()).getBucket();
- Credentials credentials = config.connection.getServiceAccount() == null ?
- null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
- config.connection.isServiceAccountFilePath());
+ String location = null;
+ String bucketName = null;
+ String path = config.getPath();
+ FailureCollector collector = context.getFailureCollector();
+
+ try {
+ bucketName = GCSPath.from(path).getBucket();
+ } catch (IllegalArgumentException e) {
+ collector.addFailure(e.getMessage(), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
+ Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
+ if (isServiceAccountFilePath == null) {
+ collector.addFailure("Service account type is undefined.",
+ "Must be `filePath` or `JSON`");
+ collector.getOrThrowException();
+ }
+
+ Credentials credentials = null;
+ try {
+ credentials = config.connection.getServiceAccount() == null ?
+ null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
+ isServiceAccountFilePath);
+ } catch (Exception e) {
+ String errorReason = "Unable to load service account credentials.";
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
try {
location = storage.get(bucketName).getLocation();
} catch (StorageException e) {
- throw new RuntimeException(
- String.format("Unable to access bucket %s. ", bucketName)
- + "Ensure you entered the correct bucket path and have permissions for it.", e);
+ String errorReason = String.format("Error code: %s, Unable to access GCS bucket '%s'. ",
+ e.getCode(), bucketName);
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()),
+ "Ensure you entered the correct bucket path and have permissions for it.")
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
}
// create asset for lineage
- String fqn = GCSPath.getFQN(config.getPath());
+ String fqn = GCSPath.getFQN(path);
String referenceName = Strings.isNullOrEmpty(config.getReferenceName())
? ReferenceNames.normalizeFqn(fqn)
: config.getReferenceName();
@@ -142,7 +186,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
@Override
protected void recordLineage(LineageRecorder lineageRecorder, List outputFields) {
- lineageRecorder.recordRead("Read", String.format("Read%sfrom Google Cloud Storage.",
+ lineageRecorder.recordRead("Read", String.format("Read %s from Google Cloud Storage.",
config.isEncrypted() ? " and decrypt " : " "), outputFields);
}
@@ -257,7 +301,7 @@ public void validate(FailureCollector collector) {
@Override
public String getPath() {
- return path;
+ return GCSPath.from(path).toString();
}
@Nullable
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java
index f6092f66f5..84165cff3c 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java
@@ -24,6 +24,10 @@
import com.google.crypto.tink.StreamingAead;
import com.google.crypto.tink.config.TinkConfig;
import com.google.crypto.tink.integration.gcpkms.GcpKmsClient;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.crypto.Decryptor;
import io.cdap.plugin.gcp.crypto.FSInputSeekableByteChannel;
import org.apache.hadoop.conf.Configurable;
@@ -66,20 +70,22 @@ public TinkDecryptor() throws GeneralSecurityException {
@Override
public SeekableByteChannel open(FileSystem fs, Path path, int bufferSize) throws IOException {
DecryptInfo decryptInfo = getDecryptInfo(fs, path);
+ Path metadataPath = new Path(path.getParent(), path.getName() + metadataSuffix);
if (decryptInfo == null) {
- throw new IllegalArgumentException("Missing encryption metadata for file '" + path
- + "'. Expected metadata path is '"
- + new Path(path.getParent(), path.getName() + metadataSuffix) + "'");
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Missing encryption metadata for file '%s'. "
+ + "Expected metadata path is '%s'.", path, metadataPath), null,
+ ErrorType.USER, false, null);
}
try {
StreamingAead streamingAead = decryptInfo.getKeysetHandle().getPrimitive(StreamingAead.class);
return streamingAead.newSeekableDecryptingChannel(new FSInputSeekableByteChannel(fs, path, bufferSize),
decryptInfo.getAad());
- } catch (IOException e) {
- throw e;
} catch (Exception e) {
- throw new IOException(e);
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.",
+ path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e);
}
}
@@ -88,7 +94,9 @@ public void setConf(Configuration configuration) {
this.configuration = configuration;
this.metadataSuffix = configuration.get(METADATA_SUFFIX);
if (metadataSuffix == null) {
- throw new IllegalArgumentException("Missing configuration '" + METADATA_SUFFIX + "'");
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Missing configuration '%s'.", METADATA_SUFFIX), null,
+ ErrorType.USER, false, null);
}
}
@@ -112,7 +120,13 @@ private DecryptInfo getDecryptInfo(FileSystem fs, Path path) throws IOException
}
// Create the DecryptInfo
- return getDecryptInfo(metadata);
+ try {
+ return getDecryptInfo(metadata);
+ } catch (Exception e) {
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.",
+ path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e);
+ }
}
static DecryptInfo getDecryptInfo(JSONObject metadata) throws IOException {