Skip to content

Commit f9ece97

Browse files
authored
Merge pull request #726 from cloudsufi/wranglerTransformPlugin
[PLUGIN-1856] Error management for Wrangler plugin
2 parents 0f03e70 + f26ecd2 commit f9ece97

File tree

2 files changed

+206
-29
lines changed

2 files changed

+206
-29
lines changed

wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import io.cdap.cdap.api.annotation.Plugin;
2525
import io.cdap.cdap.api.data.format.StructuredRecord;
2626
import io.cdap.cdap.api.data.schema.Schema;
27+
import io.cdap.cdap.api.exception.ErrorCategory;
28+
import io.cdap.cdap.api.exception.ErrorType;
29+
import io.cdap.cdap.api.exception.ErrorUtils;
2730
import io.cdap.cdap.api.metrics.Metrics;
2831
import io.cdap.cdap.api.plugin.PluginConfig;
2932
import io.cdap.cdap.api.plugin.PluginProperties;
@@ -53,6 +56,7 @@
5356
import io.cdap.wrangler.api.EntityCountMetric;
5457
import io.cdap.wrangler.api.ErrorRecord;
5558
import io.cdap.wrangler.api.ExecutorContext;
59+
import io.cdap.wrangler.api.RecipeException;
5660
import io.cdap.wrangler.api.RecipeParser;
5761
import io.cdap.wrangler.api.RecipePipeline;
5862
import io.cdap.wrangler.api.RecipeSymbol;
@@ -243,9 +247,17 @@ public void configurePipeline(PipelineConfigurer configurer) {
243247
}
244248
}
245249
} catch (CompileException e) {
246-
collector.addFailure("Compilation error occurred : " + e.getMessage(), null);
250+
collector.addFailure(
251+
String.format("Compilation error occurred, %s: %s ", e.getClass().getName(),
252+
e.getMessage()), null);
247253
} catch (DirectiveParseException e) {
248-
collector.addFailure(e.getMessage(), null);
254+
collector.addFailure(
255+
String.format("Error parsing directive, %s: %s", e.getClass().getName(),
256+
e.getMessage()), null);
257+
} catch (DirectiveLoadException e) {
258+
collector.addFailure(
259+
String.format("Error loading directive, %s: %s", e.getClass().getName(),
260+
e.getMessage()), null);
249261
}
250262

251263
// Based on the configuration create output schema.
@@ -254,18 +266,22 @@ public void configurePipeline(PipelineConfigurer configurer) {
254266
oSchema = Schema.parseJson(config.schema);
255267
}
256268
} catch (IOException e) {
257-
collector.addFailure("Invalid output schema.", null)
258-
.withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
269+
collector.addFailure(
270+
String.format("Invalid output schema %s: %s", e.getClass().getName(), e.getMessage()),
271+
null).withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
259272
}
260273

261274
// Check if jexl pre-condition is not null or empty and if so compile expression.
262-
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
275+
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(
276+
Config.NAME_PRECONDITION_LANGUAGE)) {
263277
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
264278
&& checkPreconditionNotEmpty(false)) {
265279
try {
266280
new Precondition(config.getPreconditionJEXL());
267281
} catch (PreconditionException e) {
268-
collector.addFailure(e.getMessage(), null).withConfigProperty(Config.NAME_PRECONDITION);
282+
collector.addFailure(String.format("Error compiling precondition expression, %s: %s",
283+
e.getClass().getName(), e.getMessage()), null)
284+
.withConfigProperty(Config.NAME_PRECONDITION);
269285
}
270286
}
271287
}
@@ -274,11 +290,10 @@ && checkPreconditionNotEmpty(false)) {
274290
if (oSchema != null) {
275291
configurer.getStageConfigurer().setOutputSchema(oSchema);
276292
}
277-
278293
} catch (Exception e) {
279-
LOG.error(e.getMessage());
280294
collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace());
281295
}
296+
collector.getOrThrowException();
282297
}
283298

284299
/**
@@ -319,7 +334,14 @@ public void prepareRun(StageSubmitterContext context) throws Exception {
319334
// Parse the recipe and extract all the instances of directives
320335
// to be processed for extracting lineage.
321336
RecipeParser recipe = getRecipeParser(context);
322-
List<Directive> directives = recipe.parse();
337+
List<Directive> directives;
338+
try {
339+
directives = recipe.parse();
340+
} catch (Exception e) {
341+
String errorReason = "Unable to parse recipe and extract all instances of directives.";
342+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
343+
ErrorType.USER);
344+
}
323345
emitDirectiveMetrics(directives, context.getMetrics());
324346

325347
LineageOperations lineageOperations = new LineageOperations(input, output, directives);
@@ -344,11 +366,13 @@ public void initialize(TransformContext context) throws Exception {
344366
// Based on the configuration create output schema.
345367
try {
346368
oSchema = Schema.parseJson(config.schema);
347-
} catch (IOException e) {
348-
throw new IllegalArgumentException(
349-
String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.",
350-
context.getStageName()), e
351-
);
369+
} catch (Exception e) {
370+
String errorReason = "Invalid output schema format.";
371+
String errorMessage = String.format(
372+
"Format of output schema specified is invalid. Please check the format. %s: %s",
373+
e.getClass().getName(), e.getMessage());
374+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason,
375+
errorMessage, ErrorType.USER);
352376
}
353377

354378
// Check if jexl pre-condition is not null or empty and if so compile expression.
@@ -357,8 +381,10 @@ public void initialize(TransformContext context) throws Exception {
357381
&& checkPreconditionNotEmpty(false)) {
358382
try {
359383
condition = new Precondition(config.getPreconditionJEXL());
360-
} catch (PreconditionException e) {
361-
throw new IllegalArgumentException(e.getMessage(), e);
384+
} catch (Exception e) {
385+
String errorReason = "Failed to evaluate precondition due to an invalid JEXL expression.";
386+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
387+
ErrorType.USER);
362388
}
363389
}
364390
}
@@ -367,7 +393,12 @@ && checkPreconditionNotEmpty(false)) {
367393
// Create the pipeline executor with context being set.
368394
pipeline = new RecipePipelineExecutor(recipe, ctx);
369395
} catch (Exception e) {
370-
throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e);
396+
String errorReason = "Unable to compile the recipe and execute directives.";
397+
String errorMessage = String.format(
398+
"Error compiling the recipe and executing directives. " + "%s: %s",
399+
e.getClass().getName(), e.getMessage());
400+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason,
401+
errorMessage, ErrorType.USER);
371402
}
372403

373404
String defaultStrategy = context.getArguments().get(ERROR_STRATEGY_DEFAULT);
@@ -437,8 +468,11 @@ && checkPreconditionNotEmpty(false)) {
437468
}
438469
if (WRANGLER_FAIL_PIPELINE_FOR_ERROR.isEnabled(getContext())
439470
&& onErrorStrategy.equalsIgnoreCase(ON_ERROR_FAIL_PIPELINE)) {
440-
throw new Exception(
441-
String.format("Errors in Wrangler Transformation - %s", errorMessages));
471+
String errorReason = String.format("Errors in Wrangler Transformation - %s",
472+
errorMessages);
473+
throw ErrorUtils.getProgramFailureException(
474+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorReason,
475+
ErrorType.UNKNOWN, false, null);
442476
}
443477
}
444478
} catch (Exception e) {
@@ -457,8 +491,12 @@ && checkPreconditionNotEmpty(false)) {
457491
getContext().getStageName(), e.getMessage()),
458492
"value", String.valueOf(errorCounter)
459493
));
460-
throw new Exception(String.format("Stage:%s - Failing pipeline due to error : %s",
461-
getContext().getStageName(), e.getMessage()), e);
494+
String errorReason = "Error occurred while processing input data, possibly due to invalid "
495+
+ "transformation or schema mismatch.";
496+
String errorMessage = String.format("Pipeline failed at stage:%s, %s: %s",
497+
getContext().getStageName(), e.getClass().getName(), e.getMessage());
498+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason,
499+
errorMessage, ErrorType.UNKNOWN);
462500
}
463501
// If it's 'skip-on-error' we continue processing and don't emit any error records.
464502
return;
@@ -553,18 +591,29 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) {
553591
* @throws DirectiveLoadException
554592
* @throws DirectiveParseException
555593
*/
556-
private RecipeParser getRecipeParser(StageContext context)
557-
throws DirectiveLoadException, DirectiveParseException {
594+
private RecipeParser getRecipeParser(StageContext context) {
558595

559596
registry = new CompositeDirectiveRegistry(SystemDirectiveRegistry.INSTANCE, new UserDirectiveRegistry(context));
560-
registry.reload(context.getNamespace());
597+
try {
598+
registry.reload(context.getNamespace());
599+
} catch (Exception e) {
600+
String errorReason = "Unable to load directive from the artifacts.";
601+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
602+
ErrorType.USER);
603+
}
561604

562605
String directives = config.getDirectives();
563606
if (config.getUDDs() != null && !config.getUDDs().trim().isEmpty()) {
564607
directives = String.format("#pragma load-directives %s;%s", config.getUDDs(), config.getDirectives());
565608
}
566609

567-
return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry);
610+
try {
611+
return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry);
612+
} catch (Exception e) {
613+
String errorReason = "Unable to parse the directives.";
614+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
615+
ErrorType.USER);
616+
}
568617
}
569618

570619
@Override
@@ -573,7 +622,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, R
573622
&& checkPreconditionNotEmpty(true)) {
574623

575624
if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) {
576-
throw new RuntimeException("SQL Precondition feature is not available");
625+
String errorReason = "SQL Precondition feature is not available";
626+
throw ErrorUtils.getProgramFailureException(
627+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorReason,
628+
ErrorType.SYSTEM, false, null);
577629
}
578630

579631
Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext);
@@ -598,11 +650,18 @@ private Optional<ExpressionFactory<String>> getExpressionFactory(RelationalTranf
598650
* @param directives a list of Wrangler directives
599651
* @param metrics CDAP {@link Metrics} object using which metrics can be emitted
600652
*/
601-
private void emitDirectiveMetrics(List<Directive> directives, Metrics metrics) throws DirectiveLoadException {
653+
private void emitDirectiveMetrics(List<Directive> directives, Metrics metrics) {
602654
for (Directive directive : directives) {
603655
// skip emitting metrics if the directive is not system directive
604-
if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) {
605-
continue;
656+
try {
657+
if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) {
658+
continue;
659+
}
660+
} catch (Exception e) {
661+
String errorReason = String.format("Unable to load directive %s",
662+
directive.define().getDirectiveName());
663+
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
664+
ErrorType.USER);
606665
}
607666
List<EntityCountMetric> countMetrics = new ArrayList<>();
608667

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.wrangler;
18+
19+
import com.google.common.base.Throwables;
20+
import com.google.common.collect.ImmutableMap;
21+
import io.cdap.cdap.api.exception.ErrorCategory;
22+
import io.cdap.cdap.api.exception.ErrorType;
23+
import io.cdap.cdap.api.exception.ErrorUtils;
24+
import io.cdap.cdap.api.exception.ProgramFailureException;
25+
import io.cdap.wrangler.api.DirectiveExecutionException;
26+
import io.cdap.wrangler.api.DirectiveLoadException;
27+
import io.cdap.wrangler.api.DirectiveNotFoundException;
28+
import io.cdap.wrangler.api.DirectiveParseException;
29+
import io.cdap.wrangler.api.RecipeException;
30+
import io.cdap.wrangler.expression.ELException;
31+
import io.cdap.wrangler.utils.RecordConvertorException;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
/**
36+
* Error util file to handle exceptions caught in Wrangler plugin
37+
*/
38+
public final class WranglerErrorUtil {
39+
40+
41+
private static final Map<String, String> TERMINAL_EXCEPTIONS = ImmutableMap.<String, String>builder()
42+
.put(DirectiveParseException.class.getName(), "Parsing-Directive")
43+
.put(PreconditionException.class.getName(), "Precondition")
44+
.put(DirectiveExecutionException.class.getName(), "Executing-Directive")
45+
.put(DirectiveLoadException.class.getName(), "Loading-Directive")
46+
.put(DirectiveNotFoundException.class.getName(), "Directive-Not-Found")
47+
.put(RecordConvertorException.class.getName(), "Record-Conversion")
48+
.put(ELException.class.getName(), "ExpressionLanguage-Parsing").build();
49+
50+
private static final Map<String, String> NON_TERMINAL_EXCEPTIONS = ImmutableMap.<String, String>builder()
51+
.put(RecipeException.class.getName(), "Executing-Recipe").build();
52+
53+
/**
54+
* Private constructor to prevent instantiation of this utility class.
55+
* <p>
56+
* This class is designed to contain only static utility methods for handling exceptions and
57+
* should not be instantiated. Any attempt to create an instance of this class will result in an
58+
* {@link IllegalStateException}.
59+
*/
60+
private WranglerErrorUtil() {
61+
throw new IllegalStateException("Utility class");
62+
}
63+
64+
/**
65+
* Traverses the causal chain of the given Throwable to find specific exceptions. If a terminal
66+
* exception is found, it returns a corresponding ProgramFailureException. If a non-terminal
67+
* exception is found, it is stored as a fallback. Otherwise, a generic ProgramFailureException is
68+
* returned.
69+
*
70+
* @param e the Throwable to analyze
71+
* @param errorReason the error reason to tell the cause of error
72+
* @param errorMessage default error message if no terminal exception is found
73+
* @param errorType the error type to categorize the failure
74+
* @return a ProgramFailureException with specific or generic error details
75+
*/
76+
public static ProgramFailureException getProgramFailureExceptionDetailsFromChain(Throwable e,
77+
String errorReason, String errorMessage, ErrorType errorType) {
78+
List<Throwable> causalChain = Throwables.getCausalChain(e);
79+
Throwable nonTerminalException = null;
80+
for (Throwable t : causalChain) {
81+
if (t instanceof ProgramFailureException) {
82+
return null; // Avoid multiple wrap
83+
}
84+
if (NON_TERMINAL_EXCEPTIONS.containsKey(t.getClass().getName())) {
85+
nonTerminalException = t; // Store non-terminal exception as fallback
86+
continue;
87+
}
88+
String errorSubCategory = TERMINAL_EXCEPTIONS.get(t.getClass().getName());
89+
if (errorSubCategory != null) {
90+
return getProgramFailureException(t, errorReason, errorSubCategory);
91+
}
92+
}
93+
94+
if (nonTerminalException != null) {
95+
return getProgramFailureException(nonTerminalException, errorReason,
96+
NON_TERMINAL_EXCEPTIONS.get(nonTerminalException.getClass().getName()));
97+
}
98+
99+
return ErrorUtils.getProgramFailureException(
100+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage,
101+
errorType, false, e);
102+
}
103+
104+
/**
105+
* Constructs a ProgramFailureException using the provided exception details.
106+
*
107+
* @param exception the exception to wrap
108+
* @param errorSubCategory specific subcategory of the error
109+
* @return a new ProgramFailureException with the extracted details
110+
*/
111+
private static ProgramFailureException getProgramFailureException(Throwable exception,
112+
String errorReason, String errorSubCategory) {
113+
String errorMessage = exception.getMessage();
114+
return ErrorUtils.getProgramFailureException(
115+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, errorSubCategory), errorReason,
116+
errorMessage, ErrorType.USER, false, exception);
117+
}
118+
}

0 commit comments

Comments
 (0)