format =
DelegatingGCSOutputUtils.getDelegateFormat(context.getConfiguration());
@@ -63,7 +65,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, tableName);
//Add record writer to delegate map.
- delegate = format.getRecordWriter(context);
+ delegate = new ForwardingRecordWriter(format.getRecordWriter(context));
delegateMap.put(tableName, delegate);
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputCommitter.java
new file mode 100644
index 000000000..1c1c66296
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputCommitter.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.gcs.sink;
+
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.IOException;
+
+/**
+ * ForwardingOutputCommitter which delegates all operations to another OutputCommitter.
+ *
+ * This is used to wrap the OutputCommitter of the delegate format and
+ * throw {@link io.cdap.cdap.api.exception.ProgramFailureException} from IOException.
+ */
+public class ForwardingOutputCommitter extends OutputCommitter {
+ private final OutputCommitter delegate;
+
+ public ForwardingOutputCommitter(OutputCommitter delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.setupJob(jobContext));
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.setupTask(taskAttemptContext));
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ return ExceptionUtils.invokeWithProgramFailureHandling(
+ () -> delegate.needsTaskCommit(taskAttemptContext));
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.commitTask(taskAttemptContext));
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.abortTask(taskAttemptContext));
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat, String tableName)
+ throws InterruptedException, IOException {
+ if (delegate instanceof DelegatingGCSOutputCommitter) {
+ ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling(() ->
+ ((DelegatingGCSOutputCommitter) delegate).addGCSOutputCommitterFromOutputFormat(
+ outputFormat, tableName));
+ } else {
+ throw ErrorUtils.getProgramFailureException(
+ new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Operation is not supported in the output committer: '%s'.",
+ delegate.getClass().getName()), null, ErrorType.SYSTEM, false, null
+ );
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputFormat.java
new file mode 100644
index 000000000..2574b9d79
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.gcs.sink;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
+import io.cdap.plugin.gcp.common.GCPUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import java.io.IOException;
+
+/**
+ * ForwardingOutputFormat which delegates all operations to another OutputFormat.
+ *
+ * This is used to wrap the delegate output format and
+ * throw {@link io.cdap.cdap.api.exception.ProgramFailureException} from IOException.
+ */
+public class ForwardingOutputFormat extends OutputFormat {
+ private OutputFormat delegate;
+
+ private OutputFormat getDelegateFormatInstance(Configuration configuration) {
+ if (delegate != null) {
+ return delegate;
+ }
+
+ String delegateClassName = configuration.get(
+ GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME);
+ try {
+ delegate = (OutputFormat) ReflectionUtils
+ .newInstance(configuration.getClassByName(delegateClassName), configuration);
+ return delegate;
+ } catch (ClassNotFoundException e) {
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Unable to instantiate output format for class '%s'.", delegateClassName),
+ e.getMessage(), ErrorType.SYSTEM, false, e);
+ }
+ }
+
+ @Override
+ public RecordWriter getRecordWriter (
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ return ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling(
+ () -> getDelegateFormatInstance(
+ taskAttemptContext.getConfiguration()).getRecordWriter(taskAttemptContext));
+ }
+
+ @Override
+ public void checkOutputSpecs (JobContext jobContext) throws IOException, InterruptedException {
+ ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling(
+ () -> getDelegateFormatInstance(jobContext.getConfiguration()).checkOutputSpecs(jobContext));
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter (TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling(
+ () -> getDelegateFormatInstance(
+ taskAttemptContext.getConfiguration()).getOutputCommitter(taskAttemptContext));
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingRecordWriter.java
new file mode 100644
index 000000000..aeaae898b
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingRecordWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.gcs.sink;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.plugin.gcp.common.ExceptionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.IOException;
+
+/**
+ * ForwardingRecordWriter which delegates all operations to another RecordWriter.
+ *
+ * This is used to wrap the RecordWriter of the delegate format and
+ * throw {@link io.cdap.cdap.api.exception.ProgramFailureException} from IOException.
+ */
+public class ForwardingRecordWriter extends RecordWriter {
+ private final RecordWriter delegate;
+
+ /**
+ * Constructor for ForwardingRecordWriter.
+ * @param delegate the delegate RecordWriter
+ */
+ public ForwardingRecordWriter(RecordWriter delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void write (NullWritable nullWritable, StructuredRecord structuredRecord)
+ throws IOException, InterruptedException {
+ ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling(
+ () -> delegate.write(nullWritable, structuredRecord));
+ }
+
+ @Override
+ public void close (TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling(
+ () -> delegate.close(taskAttemptContext));
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
index 129b6a297..96466b107 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
@@ -121,26 +121,42 @@ public void prepareRun(BatchSinkContext context) throws Exception {
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
collector.getOrThrowException();
- return;
}
- Credentials credentials = config.connection.getServiceAccount() == null ?
- null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
+
+ Credentials credentials = null;
+ try {
+ credentials = config.connection.getServiceAccount() == null ?
+ null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
+ isServiceAccountFilePath);
+ } catch (Exception e) {
+ String errorReason = "Unable to load service account credentials.";
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
+ String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
+ String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
+ String correctiveAction = "Ensure you entered the correct bucket path and "
+ + "have permissions for it.";
Bucket bucket;
- String location;
+ String location = null;
try {
- bucket = storage.get(config.getBucket());
+ bucket = storage.get(bucketName);
+ if (bucket != null) {
+ location = bucket.getLocation();
+ } else {
+ location = config.getLocation();
+ GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
+ }
} catch (StorageException e) {
- throw new RuntimeException(
- String.format("Unable to access or create bucket %s. ", config.getBucket())
- + "Ensure you entered the correct bucket path and have permissions for it.", e);
- }
- if (bucket != null) {
- location = bucket.getLocation();
- } else {
- GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
- location = config.getLocation();
+ String errorReason = String.format(errorReasonFormat, e.getCode());
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
}
+
this.outputPath = getOutputDir(context);
// create asset for lineage
asset = Asset.builder(config.getReferenceName())
@@ -532,8 +548,15 @@ public void validateContentType(FailureCollector failureCollector) {
}
}
- public String getBucket() {
- return GCSPath.from(path).getBucket();
+ public String getBucket(FailureCollector collector) {
+ try {
+ return GCSPath.from(path).getBucket();
+ } catch (IllegalArgumentException e) {
+ collector.addFailure(e.getMessage(), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+ return null;
}
@Override
@@ -718,8 +741,8 @@ public Builder setCustomContentType(@Nullable String customContentType) {
return this;
}
- public GCSBatchSink.GCSBatchSinkConfig build() {
- return new GCSBatchSink.GCSBatchSinkConfig(
+ public GCSBatchSinkConfig build() {
+ return new GCSBatchSinkConfig(
referenceName,
project,
fileSystemProperties,
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
index 7b15839c1..3bf01cb25 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
@@ -129,33 +129,48 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
config.validate(collector, context.getArguments().asMap());
collector.getOrThrowException();
- Map baseProperties = GCPUtils.getFileSystemProperties(config.connection,
- config.getPath(), new HashMap<>());
Map argumentCopy = new HashMap<>(context.getArguments().asMap());
-
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
+
Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
if (isServiceAccountFilePath == null) {
- context.getFailureCollector().addFailure("Service account type is undefined.",
- "Must be `filePath` or `JSON`");
- context.getFailureCollector().getOrThrowException();
+ collector.addFailure("Service account type is undefined.",
+ "Must be `filePath` or `JSON`");
+ collector.getOrThrowException();
return;
}
- Credentials credentials = config.connection.getServiceAccount() == null ?
- null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
+
+ Credentials credentials = null;
+ try {
+ credentials = config.connection.getServiceAccount() == null ?
+ null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
+ isServiceAccountFilePath);
+ } catch (Exception e) {
+ String errorReason = "Unable to load service account credentials.";
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
+ String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
+ String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
+ String correctiveAction = "Ensure you entered the correct bucket path and "
+ + "have permissions for it.";
try {
- if (storage.get(config.getBucket()) == null) {
- GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
+ if (storage.get(bucketName) == null) {
+ GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
}
} catch (StorageException e) {
- // Add more descriptive error message
- throw new RuntimeException(
- String.format("Unable to access or create bucket %s. ", config.getBucket())
- + "Ensure you entered the correct bucket path and have permissions for it.", e);
+ String errorReason = String.format(errorReasonFormat, e.getCode());
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
}
+ Map baseProperties = GCPUtils.getFileSystemProperties(config.connection,
+ config.getPath(), new HashMap<>());
if (config.getAllowFlexibleSchema()) {
//Configure MultiSink with support for flexible schemas.
configureSchemalessMultiSink(context, baseProperties, argumentCopy);
@@ -194,9 +209,10 @@ private void configureMultiSinkWithSchema(BatchSinkContext context,
config.splitField, name, schema));
outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputDir(context.getLogicalStartTime(), name));
outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
+ outputProperties.put(GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME, RecordFilterOutputFormat.class.getName());
context.addOutput(Output.of(
config.getReferenceName() + "_" + name,
- new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), outputProperties)));
+ new SinkOutputFormatProvider(ForwardingOutputFormat.class.getName(), outputProperties)));
}
}
@@ -208,13 +224,14 @@ private void configureSchemalessMultiSink(BatchSinkContext context,
Map outputProperties = new HashMap<>(baseProperties);
outputProperties.putAll(validatingOutputFormat.getOutputFormatConfiguration());
outputProperties.putAll(DelegatingGCSOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(),
+ DelegatingGCSOutputFormat.class.getName(),
config.splitField,
config.getOutputBaseDir(),
config.getOutputSuffix(context.getLogicalStartTime())));
outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
context.addOutput(Output.of(
config.getReferenceName(),
- new SinkOutputFormatProvider(DelegatingGCSOutputFormat.class.getName(), outputProperties)));
+ new SinkOutputFormatProvider(ForwardingOutputFormat.class.getName(), outputProperties)));
}
/**
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java
index 1d57e6d28..bd0d70b7f 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java
@@ -1,8 +1,13 @@
package io.cdap.plugin.gcp.gcs.sink;
import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
+import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -36,13 +41,14 @@ public void validate(FormatContext context) {
@Override
public String getOutputFormatClassName() {
- return GCSOutputFormat.class.getName();
+ return ForwardingOutputFormat.class.getName();
}
@Override
public Map getOutputFormatConfiguration() {
Map outputFormatConfiguration = new HashMap<>(delegate.getOutputFormatConfiguration());
outputFormatConfiguration.put(DELEGATE_OUTPUTFORMAT_CLASSNAME, delegate.getOutputFormatClassName());
+ outputFormatConfiguration.put(GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME, GCSOutputFormat.class.getName());
return outputFormatConfiguration;
}
@@ -52,7 +58,7 @@ public Map getOutputFormatConfiguration() {
public static class GCSOutputFormat extends OutputFormat {
private OutputFormat delegateFormat;
- private OutputFormat getDelegateFormatInstance(Configuration configuration) throws IOException {
+ private OutputFormat getDelegateFormatInstance(Configuration configuration) {
if (delegateFormat != null) {
return delegateFormat;
}
@@ -63,9 +69,9 @@ private OutputFormat getDelegateFormatInstance(Configuration configuration) thro
.newInstance(configuration.getClassByName(delegateClassName), configuration);
return delegateFormat;
} catch (ClassNotFoundException e) {
- throw new IOException(
- String.format("Unable to instantiate output format for class %s", delegateClassName),
- e);
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Unable to instantiate output format for class '%s'.", delegateClassName),
+ e.getMessage(), ErrorType.SYSTEM, false, e);
}
}
@@ -74,7 +80,7 @@ public RecordWriter getRecordWriter(TaskAttemptC
IOException, InterruptedException {
RecordWriter originalWriter = getDelegateFormatInstance(taskAttemptContext.getConfiguration())
.getRecordWriter(taskAttemptContext);
- return new GCSRecordWriter(originalWriter);
+ return new ForwardingRecordWriter(new GCSRecordWriter(originalWriter));
}
@Override
@@ -87,7 +93,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
InterruptedException {
OutputCommitter delegateCommitter = getDelegateFormatInstance(taskAttemptContext.getConfiguration())
.getOutputCommitter(taskAttemptContext);
- return new GCSOutputCommitter(delegateCommitter);
+ return new ForwardingOutputCommitter(new GCSOutputCommitter(delegateCommitter));
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java
index f4f5d6cac..82b0ebd2d 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java
@@ -18,6 +18,11 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
+import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -68,7 +73,8 @@ public RecordWriter getRecordWriter(TaskAttemptC
String passthroughVal = hConf.get(PASS_VALUE);
Schema schema = Schema.parseJson(hConf.get(ORIGINAL_SCHEMA));
- return new FilterRecordWriter(delegate, filterField, passthroughVal, schema);
+ return new ForwardingRecordWriter(
+ new FilterRecordWriter(delegate, filterField, passthroughVal, schema));
}
@Override
@@ -78,8 +84,9 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- OutputCommitter outputCommitter = getDelegateFormat(context.getConfiguration()).getOutputCommitter(context);
- return new GCSOutputCommitter(outputCommitter);
+ Configuration hConf = context.getConfiguration();
+ OutputCommitter outputCommitter = getDelegateFormat(hConf).getOutputCommitter(context);
+ return new ForwardingOutputCommitter(new GCSOutputCommitter(outputCommitter));
}
private OutputFormat getDelegateFormat(Configuration hConf) throws IOException {
@@ -89,7 +96,9 @@ private OutputFormat getDelegateFormat(Configuration hConf) throws IOException {
(Class>) hConf.getClassByName(delegateClassName);
return delegateClass.newInstance();
} catch (Exception e) {
- throw new IOException("Unable to instantiate output format for class " + delegateClassName, e);
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Unable to instantiate output format for class: '%s'.", delegateClassName),
+ e.getMessage(), ErrorType.SYSTEM, false, e);
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
index 7bfec4dd8..9283595f2 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
@@ -86,22 +86,52 @@ protected String getEmptyInputFormatClassName() {
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
// Get location of the source for lineage
- String location;
- String bucketName = GCSPath.from(config.getPath()).getBucket();
- Credentials credentials = config.connection.getServiceAccount() == null ?
- null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
- config.connection.isServiceAccountFilePath());
+ String location = null;
+ String bucketName = null;
+ String path = config.getPath();
+ FailureCollector collector = context.getFailureCollector();
+
+ try {
+ bucketName = GCSPath.from(path).getBucket();
+ } catch (IllegalArgumentException e) {
+ collector.addFailure(e.getMessage(), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
+ Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
+ if (isServiceAccountFilePath == null) {
+ collector.addFailure("Service account type is undefined.",
+ "Must be `filePath` or `JSON`");
+ collector.getOrThrowException();
+ }
+
+ Credentials credentials = null;
+ try {
+ credentials = config.connection.getServiceAccount() == null ?
+ null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
+ isServiceAccountFilePath);
+ } catch (Exception e) {
+ String errorReason = "Unable to load service account credentials.";
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
+
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
try {
location = storage.get(bucketName).getLocation();
} catch (StorageException e) {
- throw new RuntimeException(
- String.format("Unable to access bucket %s. ", bucketName)
- + "Ensure you entered the correct bucket path and have permissions for it.", e);
+ String errorReason = String.format("Error code: %s, Unable to access GCS bucket '%s'. ",
+ e.getCode(), bucketName);
+ collector.addFailure(String.format("%s %s", errorReason, e.getMessage()),
+ "Ensure you entered the correct bucket path and have permissions for it.")
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
}
// create asset for lineage
- String fqn = GCSPath.getFQN(config.getPath());
+ String fqn = GCSPath.getFQN(path);
String referenceName = Strings.isNullOrEmpty(config.getReferenceName())
? ReferenceNames.normalizeFqn(fqn)
: config.getReferenceName();
@@ -142,7 +172,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
@Override
protected void recordLineage(LineageRecorder lineageRecorder, List outputFields) {
- lineageRecorder.recordRead("Read", String.format("Read%sfrom Google Cloud Storage.",
+ lineageRecorder.recordRead("Read", String.format("Read %s from Google Cloud Storage.",
config.isEncrypted() ? " and decrypt " : " "), outputFields);
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java
index f6092f66f..424badbc4 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java
@@ -24,6 +24,12 @@
import com.google.crypto.tink.StreamingAead;
import com.google.crypto.tink.config.TinkConfig;
import com.google.crypto.tink.integration.gcpkms.GcpKmsClient;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
+import io.cdap.cdap.runtime.spi.runtimejob.ProgramRunFailureException;
+import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.crypto.Decryptor;
import io.cdap.plugin.gcp.crypto.FSInputSeekableByteChannel;
import org.apache.hadoop.conf.Configurable;
@@ -66,20 +72,22 @@ public TinkDecryptor() throws GeneralSecurityException {
@Override
public SeekableByteChannel open(FileSystem fs, Path path, int bufferSize) throws IOException {
DecryptInfo decryptInfo = getDecryptInfo(fs, path);
+ Path metadataPath = new Path(path.getParent(), path.getName() + metadataSuffix);
if (decryptInfo == null) {
- throw new IllegalArgumentException("Missing encryption metadata for file '" + path
- + "'. Expected metadata path is '"
- + new Path(path.getParent(), path.getName() + metadataSuffix) + "'");
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Missing encryption metadata for file '%s'. "
+ + "Expected metadata path is '%s'.", path, metadataPath), null,
+ ErrorType.USER, false, null);
}
try {
StreamingAead streamingAead = decryptInfo.getKeysetHandle().getPrimitive(StreamingAead.class);
return streamingAead.newSeekableDecryptingChannel(new FSInputSeekableByteChannel(fs, path, bufferSize),
decryptInfo.getAad());
- } catch (IOException e) {
- throw e;
} catch (Exception e) {
- throw new IOException(e);
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.",
+ path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e);
}
}
@@ -88,7 +96,9 @@ public void setConf(Configuration configuration) {
this.configuration = configuration;
this.metadataSuffix = configuration.get(METADATA_SUFFIX);
if (metadataSuffix == null) {
- throw new IllegalArgumentException("Missing configuration '" + METADATA_SUFFIX + "'");
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Missing configuration '%s'.", METADATA_SUFFIX), null,
+ ErrorType.USER, false, null);
}
}
@@ -112,7 +122,13 @@ private DecryptInfo getDecryptInfo(FileSystem fs, Path path) throws IOException
}
// Create the DecryptInfo
- return getDecryptInfo(metadata);
+ try {
+ return getDecryptInfo(metadata);
+ } catch (Exception e) {
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
+ String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.",
+ path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e);
+ }
}
static DecryptInfo getDecryptInfo(JSONObject metadata) throws IOException {
diff --git a/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java b/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java
index 37950d531..21bedf5b5 100644
--- a/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java
@@ -18,6 +18,7 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.gcp.dataplex.common.util.DataplexConstants;
@@ -238,7 +239,7 @@ public void testGetRecordWriterWithDifferentDataset() throws IOException, Interr
configuration.set("mapred.bq.output.gcs.fileformat", "AVRO");
configuration.set(DELEGATE_OUTPUTFORMAT_CLASSNAME, "class");
when(mockContext.getConfiguration()).thenReturn(configuration);
- Assert.assertThrows(IOException.class, () -> {
+ Assert.assertThrows(ProgramFailureException.class, () -> {
dataplexOutputFormat.getRecordWriter(mockContext);
});
}
diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java
index 798cd4392..17c4be0a9 100644
--- a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java
@@ -25,8 +25,9 @@ public class GCSOutputformatProviderTest {
@Test
public void testRecordWriter() throws IOException, InterruptedException {
RecordWriter mockWriter = Mockito.mock(RecordWriter.class);
- GCSOutputFormatProvider.GCSRecordWriter recordWriterToTest = new GCSOutputFormatProvider.GCSRecordWriter(
- mockWriter);
+ ForwardingRecordWriter recordWriterToTest =
+ new ForwardingRecordWriter(new GCSOutputFormatProvider.GCSRecordWriter(
+ mockWriter));
NullWritable mockWritable = Mockito.mock(NullWritable.class);
StructuredRecord mockRecord = Mockito.mock(StructuredRecord.class);
for (int i = 0; i < 5; i++) {
diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java
index 91cc1d607..1dac23a7a 100644
--- a/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java
+++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java
@@ -87,8 +87,10 @@ public TestDelegatingGCSOutputCommitter() throws IOException {
private void writeOutput(TaskAttemptContext context, DelegatingGCSOutputCommitter committer) throws IOException,
InterruptedException {
NullWritable nullWritable = NullWritable.get();
- DelegatingGCSRecordWriter delegatingGCSRecordWriter = new DelegatingGCSRecordWriter(context, key1,
- committer);
+ ForwardingRecordWriter delegatingGCSRecordWriter = new ForwardingRecordWriter(
+ new DelegatingGCSRecordWriter(context, key1,
+ new ForwardingOutputCommitter(committer),
+ new DelegatingGCSOutputFormat()));
try {
delegatingGCSRecordWriter.write(nullWritable, record1);
delegatingGCSRecordWriter.write(nullWritable, record2);