Skip to content

Commit 5b6fb26

Browse files
committed
Add MysqlErrorDetailsProvider
1 parent 81c8853 commit 5b6fb26

File tree

15 files changed

+286
-24
lines changed

15 files changed

+286
-24
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db;
18+
19+
import com.google.common.base.Throwables;
20+
import io.cdap.cdap.api.exception.ErrorCategory;
21+
import io.cdap.cdap.api.exception.ErrorType;
22+
import io.cdap.cdap.api.exception.ErrorUtils;
23+
import io.cdap.cdap.api.exception.ProgramFailureException;
24+
import io.cdap.cdap.etl.api.exception.ErrorContext;
25+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
26+
27+
import java.sql.SQLException;
28+
import java.util.List;
29+
30+
/**
31+
* A custom ErrorDetailsProvider for Database plugins.
32+
*/
33+
public class DBErrorDetailsProvider implements ErrorDetailsProvider {
34+
35+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
36+
List<Throwable> causalChain = Throwables.getCausalChain(e);
37+
for (Throwable t : causalChain) {
38+
if (t instanceof ProgramFailureException) {
39+
// if causal chain already has program failure exception, return null to avoid double wrap.
40+
return null;
41+
}
42+
if (t instanceof SQLException) {
43+
return getProgramFailureException((SQLException) t, errorContext);
44+
}
45+
if (t instanceof IllegalArgumentException) {
46+
return getProgramFailureException((IllegalArgumentException) t, errorContext);
47+
}
48+
if (t instanceof IllegalStateException) {
49+
return getProgramFailureException((IllegalStateException) t, errorContext);
50+
}
51+
52+
}
53+
return null;
54+
}
55+
56+
/**
57+
* Get a ProgramFailureException with the given error
58+
* information from {@link SQLException}.
59+
*
60+
* @param e The SQLException to get the error information from.
61+
* @return A ProgramFailureException with the given error information.
62+
*/
63+
private ProgramFailureException getProgramFailureException(SQLException e, ErrorContext errorContext) {
64+
String errorMessage = e.getMessage();
65+
String sqlState = e.getSQLState();
66+
int errorCode = e.getErrorCode();
67+
String errorMessageWithDetails = String.format(
68+
"Error occurred in the phase: '%s'. Error message: '%s'. Error code: '%s'. sqlState: '%s'",
69+
errorContext.getPhase(), errorMessage, errorCode, sqlState);
70+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
71+
errorMessage, errorMessageWithDetails, getErrorTypeFromErrorCode(errorCode), false, e);
72+
}
73+
74+
/**
75+
* Get a ProgramFailureException with the given error
76+
* information from {@link IllegalArgumentException}.
77+
*
78+
* @param e The IllegalArgumentException to get the error information from.
79+
* @return A ProgramFailureException with the given error information.
80+
*/
81+
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
82+
String errorMessage = e.getMessage();
83+
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
84+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
85+
errorMessage,
86+
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e);
87+
}
88+
89+
/**
90+
* Get a ProgramFailureException with the given error
91+
* information from {@link IllegalStateException}.
92+
*
93+
* @param e The IllegalStateException to get the error information from.
94+
* @return A ProgramFailureException with the given error information.
95+
*/
96+
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
97+
String errorMessage = e.getMessage();
98+
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
99+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
100+
errorMessage,
101+
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
102+
103+
}
104+
105+
/**
106+
* Get the external documentation link for the client errors if available.
107+
*
108+
* @return The external documentation link as a {@link String}.
109+
*/
110+
protected String getExternalDocumentationLink() {
111+
return null;
112+
}
113+
114+
protected ErrorType getErrorTypeFromErrorCode(int errorCode) {
115+
return ErrorType.UNKNOWN;
116+
}
117+
}

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import io.cdap.cdap.api.common.Bytes;
2222
import io.cdap.cdap.api.data.format.StructuredRecord;
2323
import io.cdap.cdap.api.data.schema.Schema;
24+
import io.cdap.cdap.api.exception.ErrorCategory;
25+
import io.cdap.cdap.api.exception.ErrorType;
26+
import io.cdap.cdap.api.exception.ErrorUtils;
2427
import io.cdap.plugin.util.DBUtils;
2528
import io.cdap.plugin.util.Lazy;
2629
import org.apache.hadoop.conf.Configurable;
@@ -305,7 +308,10 @@ protected void updateOperation(PreparedStatement stmt) throws SQLException {
305308
* @throws SQLException
306309
*/
307310
protected void upsertOperation(PreparedStatement stmt) throws SQLException {
308-
throw new UnsupportedOperationException();
311+
String errorMessage = "Upsert operation is not supported for this plugin.";
312+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
313+
errorMessage, errorMessage, ErrorType.SYSTEM, false, new UnsupportedOperationException(errorMessage));
314+
309315
}
310316

311317
private boolean fillUpdateParams(List<String> updatedKeyList, ColumnType columnType) {
@@ -366,7 +372,9 @@ private void writeToDataOut(DataOutput out, Schema.Field field) throws IOExcepti
366372
out.write((byte[]) fieldValue);
367373
break;
368374
default:
369-
throw new IOException(String.format("Unsupported datatype: %s with value: %s.", fieldType, fieldValue));
375+
String errorMessage = String.format("Unsupported datatype: %s with value: %s.", fieldType, fieldValue);
376+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
377+
errorMessage, errorMessage, ErrorType.USER, false, new IOException(errorMessage));
370378
}
371379
}
372380

database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import io.cdap.cdap.etl.api.StageConfigurer;
3333
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3434
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
35-
import io.cdap.cdap.etl.api.validation.InvalidStageException;
35+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3636
import io.cdap.plugin.common.LineageRecorder;
3737
import io.cdap.plugin.common.ReferenceBatchSink;
3838
import io.cdap.plugin.common.ReferencePluginConfig;
@@ -42,6 +42,7 @@
4242
import io.cdap.plugin.db.ConnectionConfig;
4343
import io.cdap.plugin.db.ConnectionConfigAccessor;
4444
import io.cdap.plugin.db.DBConfig;
45+
import io.cdap.plugin.db.DBErrorDetailsProvider;
4546
import io.cdap.plugin.db.DBRecord;
4647
import io.cdap.plugin.db.Operation;
4748
import io.cdap.plugin.db.SchemaReader;
@@ -163,12 +164,22 @@ public void validateOperations(FailureCollector collector, T dbSinkConfig, @Null
163164
}
164165
}
165166

167+
/**
168+
* Returns the ErrorDetailsProvider class name.
169+
* Override this method to provide a custom ErrorDetailsProvider class name.
170+
*
171+
* @return ErrorDetailsProvider class name
172+
*/
173+
protected String getErrorDetailsProviderClassName() {
174+
return DBErrorDetailsProvider.class.getName();
175+
}
176+
166177
@Override
167178
public void prepareRun(BatchSinkContext context) {
168179
String connectionString = dbSinkConfig.getConnectionString();
169180
String dbSchemaName = dbSinkConfig.getDBSchemaName();
170181
String tableName = dbSinkConfig.getTableName();
171-
182+
FailureCollector collector = context.getFailureCollector();
172183
LOG.debug("tableName = {}; schemaName = {}, pluginType = {}; pluginName = {}; connectionString = {};",
173184
tableName,
174185
dbSchemaName,
@@ -183,13 +194,12 @@ public void prepareRun(BatchSinkContext context) {
183194
// make sure that the destination table exists and column types are correct
184195
try {
185196
if (Objects.nonNull(outputSchema)) {
186-
FailureCollector collector = context.getFailureCollector();
187197
validateOperations(collector, dbSinkConfig, outputSchema);
188198
validateSchema(collector, driverClass, tableName,
189199
outputSchema, dbSchemaName);
190200
collector.getOrThrowException();
191201
} else {
192-
outputSchema = inferSchema(driverClass);
202+
outputSchema = inferSchema(driverClass, collector);
193203
}
194204
} finally {
195205
DBUtils.cleanup(driverClass);
@@ -229,6 +239,9 @@ public void prepareRun(BatchSinkContext context) {
229239
}
230240

231241
addOutputContext(context);
242+
243+
// set error details provider
244+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
232245
}
233246
protected void addOutputContext(BatchSinkContext context) {
234247
context.addOutput(Output.of(dbSinkConfig.getReferenceName(),
@@ -257,13 +270,15 @@ protected void setColumnsInfo(List<Schema.Field> fields) {
257270
public void initialize(BatchRuntimeContext context) throws Exception {
258271
super.initialize(context);
259272
driverClass = context.loadPluginClass(getJDBCPluginId());
260-
Schema outputSchema = Optional.ofNullable(context.getInputSchema()).orElse(inferSchema(driverClass));
273+
FailureCollector failureCollector = context.getFailureCollector();
274+
Schema outputSchema = Optional.ofNullable(context.getInputSchema()).orElse(inferSchema(driverClass,
275+
failureCollector));
261276

262277
setColumnsInfo(outputSchema.getFields());
263278
setResultSetMetadata();
264279
}
265280

266-
private Schema inferSchema(Class<? extends Driver> driverClass) {
281+
private Schema inferSchema(Class<? extends Driver> driverClass, FailureCollector collector) {
267282
List<Schema.Field> inferredFields = new ArrayList<>();
268283
String dbSchemaName = dbSinkConfig.getDBSchemaName();
269284
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
@@ -283,11 +298,16 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
283298
inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
284299
}
285300
} catch (SQLException e) {
286-
throw new InvalidStageException("Error while reading table metadata", e);
287-
301+
String errorReason = "Error while reading table metadata";
302+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
303+
.withStacktrace(e.getStackTrace());
304+
collector.getOrThrowException();
288305
}
289306
} catch (IllegalAccessException | InstantiationException | SQLException e) {
290-
throw new InvalidStageException("JDBC Driver unavailable: " + dbSinkConfig.getJdbcPluginName(), e);
307+
String errorReason = String.format("JDBC Driver unavailable: %s", dbSinkConfig.getJdbcPluginName());
308+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
309+
.withStacktrace(e.getStackTrace());
310+
collector.getOrThrowException();
291311
}
292312
return Schema.recordOf("inferredSchema", inferredFields);
293313
}

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
import io.cdap.cdap.api.data.format.StructuredRecord;
2626
import io.cdap.cdap.api.data.schema.Schema;
2727
import io.cdap.cdap.api.dataset.lib.KeyValue;
28+
import io.cdap.cdap.api.exception.ErrorCategory;
29+
import io.cdap.cdap.api.exception.ErrorType;
30+
import io.cdap.cdap.api.exception.ErrorUtils;
2831
import io.cdap.cdap.api.plugin.PluginConfig;
2932
import io.cdap.cdap.etl.api.Emitter;
3033
import io.cdap.cdap.etl.api.FailureCollector;
3134
import io.cdap.cdap.etl.api.PipelineConfigurer;
3235
import io.cdap.cdap.etl.api.StageConfigurer;
3336
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3437
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
38+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3539
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
3640
import io.cdap.plugin.common.LineageRecorder;
3741
import io.cdap.plugin.common.ReferenceBatchSource;
@@ -41,6 +45,7 @@
4145
import io.cdap.plugin.db.ConnectionConfig;
4246
import io.cdap.plugin.db.ConnectionConfigAccessor;
4347
import io.cdap.plugin.db.DBConfig;
48+
import io.cdap.plugin.db.DBErrorDetailsProvider;
4449
import io.cdap.plugin.db.DBRecord;
4550
import io.cdap.plugin.db.SchemaReader;
4651
import io.cdap.plugin.db.TransactionIsolationLevel;
@@ -114,7 +119,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
114119

115120
if (sourceConfig.canConnect()) {
116121
try {
117-
stageConfigurer.setOutputSchema(getSchema(driverClass));
122+
stageConfigurer.setOutputSchema(getSchema(driverClass, collector));
118123
} catch (IllegalAccessException | InstantiationException e) {
119124
collector.addFailure("Unable to instantiate JDBC driver: " + e.getMessage(), null)
120125
.withStacktrace(e.getStackTrace());
@@ -127,8 +132,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
127132
}
128133
}
129134

130-
public Schema getSchema(Class<? extends Driver> driverClass) throws IllegalAccessException,
131-
SQLException, InstantiationException {
135+
public Schema getSchema(Class<? extends Driver> driverClass, FailureCollector collector)
136+
throws IllegalAccessException, SQLException, InstantiationException {
132137
DriverCleanup driverCleanup;
133138
try {
134139

@@ -140,8 +145,10 @@ public Schema getSchema(Class<? extends Driver> driverClass) throws IllegalAcces
140145
}
141146
} catch (Exception e) {
142147
LOG.error("Exception while performing getSchema", e);
143-
throw e;
148+
collector.addFailure(e.getMessage(), null).withStacktrace(e.getStackTrace());
149+
collector.getOrThrowException();
144150
}
151+
return null;
145152
}
146153

147154
/**
@@ -194,7 +201,11 @@ private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
194201

195202
} catch (SQLException e) {
196203
// wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath
197-
throw new SQLException(e.getMessage(), e.getSQLState(), e.getErrorCode());
204+
String errorMessageWithDetails = String.format("Error occurred while trying to get schema from database." +
205+
"Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState());
206+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
207+
e.getMessage(), errorMessageWithDetails, ErrorType.USER, false, new SQLException(e.getMessage(),
208+
e.getSQLState(), e.getErrorCode()));
198209
} finally {
199210
driverCleanup.destroy();
200211
}
@@ -212,6 +223,16 @@ protected SchemaReader getSchemaReader() {
212223
return new CommonSchemaReader();
213224
}
214225

226+
/**
227+
* Returns the ErrorDetailsProvider class name.
228+
* Override this method to provide a custom ErrorDetailsProvider class name.
229+
*
230+
* @return ErrorDetailsProvider class name
231+
*/
232+
protected String getErrorDetailsProviderClassName() {
233+
return DBErrorDetailsProvider.class.getName();
234+
}
235+
215236
private DriverCleanup loadPluginClassAndGetDriver(Class<? extends Driver> driverClass)
216237
throws IllegalAccessException, InstantiationException, SQLException {
217238

@@ -270,6 +291,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {
270291
}
271292
context.setInput(Input.of(sourceConfig.getReferenceName(), new SourceInputFormatProvider(
272293
DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration())));
294+
295+
// set error details provider
296+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
273297
}
274298

275299
/**

database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package io.cdap.plugin.util;
1818

19+
import io.cdap.cdap.api.exception.ErrorCategory;
20+
import io.cdap.cdap.api.exception.ErrorType;
21+
import io.cdap.cdap.api.exception.ErrorUtils;
1922
import io.cdap.cdap.api.plugin.PluginConfig;
2023
import io.cdap.cdap.api.plugin.PluginProperties;
2124
import io.cdap.cdap.etl.api.FailureCollector;
@@ -285,6 +288,8 @@ public static String constructFQN(String dbType, String host, int port, String d
285288
}
286289

287290
private DBUtils() {
288-
throw new AssertionError("Should not instantiate static utility class.");
291+
String errorMessage = "Should not instantiate static utility class.";
292+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
293+
errorMessage, errorMessage, ErrorType.SYSTEM, false, new AssertionError(errorMessage));
289294
}
290295
}

database-commons/src/test/java/io/cdap/plugin/db/CustomAssertions.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package io.cdap.plugin.db;
1818

19+
import io.cdap.cdap.api.exception.ErrorCategory;
20+
import io.cdap.cdap.api.exception.ErrorType;
21+
import io.cdap.cdap.api.exception.ErrorUtils;
1922
import org.junit.Assert;
2023

2124
/**
@@ -30,7 +33,9 @@ public final class CustomAssertions {
3033
public static final double DELTA = 0.000001;
3134

3235
private CustomAssertions() {
33-
throw new AssertionError("Should not instantiate static utility class.");
36+
String errorMessage = "Should not instantiate static utility class.";
37+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
38+
errorMessage, errorMessage, ErrorType.SYSTEM, false, new AssertionError(errorMessage));
3439
}
3540

3641
/**

0 commit comments

Comments
 (0)