Skip to content

Commit 7f24625

Browse files
authored
Merge pull request #1912 from cloudsufi/filePLuginErrorMang
[PLUGIN-1848] Error Management for File plugin Source/Sink
2 parents 8a81b24 + 6f4b5d8 commit 7f24625

File tree

8 files changed

+243
-39
lines changed

8 files changed

+243
-39
lines changed

core-plugins/src/main/java/io/cdap/plugin/batch/sink/FileSink.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@
2323
import io.cdap.cdap.api.annotation.Macro;
2424
import io.cdap.cdap.api.annotation.Name;
2525
import io.cdap.cdap.api.annotation.Plugin;
26+
import io.cdap.cdap.api.exception.ErrorCategory;
27+
import io.cdap.cdap.api.exception.ErrorType;
28+
import io.cdap.cdap.api.exception.ErrorUtils;
2629
import io.cdap.cdap.etl.api.FailureCollector;
2730
import io.cdap.cdap.etl.api.batch.BatchSink;
2831
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
32+
import io.cdap.plugin.batch.source.FileErrorDetailsProvider;
2933
import io.cdap.plugin.format.plugin.AbstractFileSink;
3034
import io.cdap.plugin.format.plugin.AbstractFileSinkConfig;
3135

@@ -48,6 +52,11 @@ public FileSink(Conf config) {
4852
this.config = config;
4953
}
5054

55+
@Override
56+
protected String getErrorDetailsProviderClassName() {
57+
return FileErrorDetailsProvider.class.getName();
58+
}
59+
5160
@Override
5261
protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
5362
return config.getFSProperties();
@@ -85,7 +94,8 @@ public void validate(FailureCollector collector) {
8594
try {
8695
getFSProperties();
8796
} catch (IllegalArgumentException e) {
88-
collector.addFailure("File system properties must be a valid json.", null)
97+
collector.addFailure(String.format("File system properties must be a valid json. %s: %s",
98+
e.getClass().getName(), e.getMessage()), null)
8999
.withConfigProperty(NAME_FILE_SYSTEM_PROPERTIES).withStacktrace(e.getStackTrace());
90100
}
91101
}
@@ -97,7 +107,12 @@ private Map<String, String> getFSProperties() {
97107
try {
98108
return GSON.fromJson(fileSystemProperties, MAP_TYPE);
99109
} catch (JsonSyntaxException e) {
100-
throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e);
110+
String errorMessage = String.format(
111+
"Failed to parse filesystem properties %s with message: %s: %s", fileSystemProperties,
112+
e.getClass().getName(), e.getMessage());
113+
throw ErrorUtils.getProgramFailureException(
114+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
115+
ErrorType.USER, false, e);
101116
}
102117
}
103118
}

core-plugins/src/main/java/io/cdap/plugin/batch/source/FileBatchSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public FileBatchSource(FileSourceConfig config) {
4848
this.config = config;
4949
}
5050

51+
@Override
52+
protected String getErrorDetailsProviderClassName() {
53+
return FileErrorDetailsProvider.class.getName();
54+
}
55+
5156
@Override
5257
protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
5358
Map<String, String> properties = new HashMap<>(config.getFileSystemProperties());
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.batch.source;
18+
19+
import io.cdap.plugin.common.HydratorErrorDetailsProvider;
20+
21+
/**
22+
* FileErrorDetails provider
23+
*/
24+
public class FileErrorDetailsProvider extends HydratorErrorDetailsProvider {
25+
26+
}

core-plugins/src/main/java/io/cdap/plugin/batch/source/FileSourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ Map<String, String> getFileSystemProperties() {
124124
try {
125125
return GSON.fromJson(fileSystemProperties, MAP_STRING_STRING_TYPE);
126126
} catch (Exception e) {
127-
throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e);
127+
throw new IllegalArgumentException(String.format("Unable to parse filesystem properties: %s", e.getMessage()), e);
128128
}
129129
}
130130

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.common;
18+
19+
import com.google.common.base.Throwables;
20+
import io.cdap.cdap.api.exception.ErrorCategory;
21+
import io.cdap.cdap.api.exception.ErrorType;
22+
import io.cdap.cdap.api.exception.ErrorUtils;
23+
import io.cdap.cdap.api.exception.ProgramFailureException;
24+
import io.cdap.cdap.etl.api.exception.ErrorContext;
25+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
26+
27+
import java.util.List;
28+
import javax.annotation.Nullable;
29+
30+
/**
31+
* Error details provided for the Hydrator plugins
32+
**/
33+
public class HydratorErrorDetailsProvider implements ErrorDetailsProvider {
34+
35+
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";
36+
37+
@Nullable
38+
@Override
39+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
40+
List<Throwable> causalChain = Throwables.getCausalChain(e);
41+
for (Throwable t : causalChain) {
42+
if (t instanceof ProgramFailureException) {
43+
// if causal chain already has program failure exception, return null to avoid double wrap.
44+
return null;
45+
}
46+
if (t instanceof IllegalArgumentException) {
47+
return getProgramFailureException((IllegalArgumentException) t, errorContext,
48+
ErrorType.USER);
49+
}
50+
}
51+
return null;
52+
}
53+
54+
/**
55+
* Get a ProgramFailureException with the given error information from {@link Exception}.
56+
*
57+
* @param exception The Exception to get the error information from.
58+
* @return A ProgramFailureException with the given error information.
59+
*/
60+
private ProgramFailureException getProgramFailureException(IllegalArgumentException exception,
61+
ErrorContext errorContext, ErrorType errorType) {
62+
String errorMessage = exception.getMessage();
63+
return ErrorUtils.getProgramFailureException(
64+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
65+
String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), errorMessage), errorType,
66+
false, exception);
67+
}
68+
}

format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSink.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616

1717
package io.cdap.plugin.format.plugin;
1818

19+
import com.google.common.base.Strings;
1920
import io.cdap.cdap.api.data.batch.Output;
2021
import io.cdap.cdap.api.data.format.StructuredRecord;
2122
import io.cdap.cdap.api.data.schema.Schema;
2223
import io.cdap.cdap.api.dataset.lib.KeyValue;
24+
import io.cdap.cdap.api.exception.ErrorCategory;
25+
import io.cdap.cdap.api.exception.ErrorType;
26+
import io.cdap.cdap.api.exception.ErrorUtils;
2327
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
2428
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
2529
import io.cdap.cdap.api.plugin.PluginConfig;
@@ -28,6 +32,7 @@
2832
import io.cdap.cdap.etl.api.PipelineConfigurer;
2933
import io.cdap.cdap.etl.api.batch.BatchSink;
3034
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
35+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3136
import io.cdap.cdap.etl.api.validation.FormatContext;
3237
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
3338
import io.cdap.plugin.common.LineageRecorder;
@@ -99,11 +104,12 @@ protected ValidatingOutputFormat getValidatingOutputFormat(PipelineConfigurer pi
99104
}
100105

101106
@Override
102-
public void prepareRun(BatchSinkContext context) throws Exception {
107+
public void prepareRun(BatchSinkContext context) {
103108
FailureCollector collector = context.getFailureCollector();
104109
config.validate(collector, context.getArguments().asMap());
105110
String format = config.getFormatName();
106-
ValidatingOutputFormat validatingOutputFormat = getOutputFormatForRun(context);
111+
ValidatingOutputFormat validatingOutputFormat = null;
112+
validatingOutputFormat = getOutputFormatForRun(context, collector);
107113
FormatContext formatContext = new FormatContext(collector, context.getInputSchema());
108114
validateOutputFormatProvider(formatContext, format, validatingOutputFormat);
109115
collector.getOrThrowException();
@@ -124,12 +130,21 @@ public void prepareRun(BatchSinkContext context) throws Exception {
124130
Map<String, String> outputProperties = new HashMap<>(validatingOutputFormat.getOutputFormatConfiguration());
125131
outputProperties.putAll(getFileSystemProperties(context));
126132
outputProperties.put(FileOutputFormat.OUTDIR, getOutputDir(context));
133+
if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) {
134+
context.setErrorDetailsProvider(
135+
new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
136+
}
127137
context.addOutput(Output.of(config.getReferenceName(),
128138
new SinkOutputFormatProvider(validatingOutputFormat.getOutputFormatClassName(),
129139
outputProperties)));
130140
}
131141

132-
protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException {
142+
protected String getErrorDetailsProviderClassName() {
143+
return null;
144+
}
145+
146+
protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context,
147+
FailureCollector collector) {
133148
String fileFormat = config.getFormatName();
134149
try {
135150
return context.newPluginInstance(fileFormat);
@@ -138,11 +153,21 @@ protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context)
138153
for (InvalidPluginProperty invalidProperty : e.getInvalidProperties()) {
139154
properties.add(invalidProperty.getName());
140155
}
141-
String errorMessage = String.format("Format '%s' cannot be used because properties %s were not provided or " +
142-
"were invalid when the pipeline was deployed. Set the format to a " +
143-
"different value, or re-create the pipeline with all required properties.",
144-
fileFormat, properties);
145-
throw new IllegalArgumentException(errorMessage, e);
156+
String errorMessage = String.format(
157+
"Format '%s' cannot be used because properties %s were not provided or "
158+
+ "were invalid when the pipeline was deployed. Set the format to a "
159+
+ "different value, or re-create the pipeline with all required properties. %s: %s",
160+
fileFormat, properties, e.getClass().getName(), e.getMessage());
161+
throw ErrorUtils.getProgramFailureException(
162+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
163+
ErrorType.USER, false, e);
164+
} catch (InstantiationException e) {
165+
collector.addFailure(
166+
String.format("Could not load the output format %s, %s: %s", fileFormat,
167+
e.getClass().getName(), e.getMessage()), null)
168+
.withPluginNotFound(fileFormat, fileFormat, ValidatingOutputFormat.PLUGIN_TYPE)
169+
.withStacktrace(e.getStackTrace());
170+
throw collector.getOrThrowException();
146171
}
147172
}
148173

@@ -189,9 +214,8 @@ private void validateOutputFormatProvider(FormatContext context, String format,
189214
@Nullable ValidatingOutputFormat validatingOutputFormat) {
190215
FailureCollector collector = context.getFailureCollector();
191216
if (validatingOutputFormat == null) {
192-
collector.addFailure(
193-
String.format("Could not find the '%s' output format plugin.", format), null)
194-
.withPluginNotFound(format, format, ValidatingOutputFormat.PLUGIN_TYPE);
217+
collector.addFailure(String.format("Could not load the output format %s.", format), null)
218+
.withPluginNotFound(format, format, ValidatingOutputFormat.PLUGIN_TYPE);
195219
} else {
196220
validatingOutputFormat.validate(context);
197221
}

format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSinkConfig.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import io.cdap.cdap.api.annotation.Description;
2121
import io.cdap.cdap.api.annotation.Macro;
2222
import io.cdap.cdap.api.data.schema.Schema;
23+
import io.cdap.cdap.api.exception.ErrorCategory;
24+
import io.cdap.cdap.api.exception.ErrorType;
25+
import io.cdap.cdap.api.exception.ErrorUtils;
2326
import io.cdap.cdap.api.plugin.PluginConfig;
2427
import io.cdap.cdap.etl.api.FailureCollector;
2528
import io.cdap.plugin.common.IdUtils;
@@ -90,8 +93,10 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
9093
try {
9194
new SimpleDateFormat(suffix);
9295
} catch (IllegalArgumentException e) {
93-
collector.addFailure("Invalid suffix.", "Ensure provided suffix is valid.")
94-
.withConfigProperty(NAME_SUFFIX).withStacktrace(e.getStackTrace());
96+
collector.addFailure(
97+
String.format("Invalid suffix, %s: %s", e.getClass().getName(), e.getMessage()),
98+
"Ensure provided suffix is valid.").withConfigProperty(NAME_SUFFIX)
99+
.withStacktrace(e.getStackTrace());
95100
}
96101
}
97102

@@ -125,7 +130,11 @@ public Schema getSchema() {
125130
try {
126131
return Schema.parseJson(schema);
127132
} catch (IOException e) {
128-
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
133+
String errorMessage = String.format("Invalid schema %s, %s: %s", schema,
134+
e.getClass().getName(), e.getMessage());
135+
throw ErrorUtils.getProgramFailureException(
136+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
137+
ErrorType.USER, false, e);
129138
}
130139
}
131140

0 commit comments

Comments
 (0)