Skip to content

Commit b492572

Browse files
authored
Merge pull request #57 from data-integrations/feature/plugin-validation
[CDAP-15787] Add failure collector to database plugins
2 parents 4bfdb6d + b4eb4ca commit b492572

File tree

10 files changed

+134
-87
lines changed

10 files changed

+134
-87
lines changed

database-commons/src/main/java/io/cdap/plugin/db/batch/TransactionIsolationLevel.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
package io.cdap.plugin.db.batch;
1818

1919

20+
import io.cdap.cdap.etl.api.FailureCollector;
21+
import io.cdap.plugin.db.DBConfig;
22+
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
23+
2024
import java.sql.Connection;
2125
import java.util.Arrays;
2226
import javax.annotation.Nullable;
@@ -69,17 +73,20 @@ public static int getLevel(@Nullable String level) {
6973
* Validates that the given level is either null or one of the possible transaction isolation levels.
7074
*
7175
* @param level the level to check
76+
* @param collector failure collector
7277
*/
73-
public static void validate(@Nullable String level) {
78+
public static void validate(@Nullable String level, FailureCollector collector) {
7479
if (level == null) {
7580
return;
7681
}
7782
try {
7883
Level.valueOf(level.toUpperCase());
7984
} catch (IllegalArgumentException e) {
80-
throw new IllegalArgumentException(String.format(
81-
"Transaction isolation level must be one of the following values: %s, but got: %s.",
82-
Arrays.toString(Level.values()), level));
85+
collector.addFailure(
86+
"Unsupported Transaction Isolation Level.",
87+
String.format("Transaction Isolation Level must be one of the following values: %s",
88+
Arrays.toString(Level.values())))
89+
.withConfigProperty(AbstractDBSink.DBSinkConfig.TRANSACTION_ISOLATION_LEVEL);
8390
}
8491
}
8592
}

database-commons/src/main/java/io/cdap/plugin/db/batch/action/AbstractQueryAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.db.batch.action;
1818

19+
import io.cdap.cdap.etl.api.FailureCollector;
1920
import io.cdap.cdap.etl.api.PipelineConfigurer;
2021
import io.cdap.cdap.etl.api.batch.BatchActionContext;
2122
import io.cdap.cdap.etl.api.batch.PostAction;
@@ -40,7 +41,9 @@ public AbstractQueryAction(QueryActionConfig config, Boolean enableAutoCommit) {
4041

4142
@Override
4243
public void run(BatchActionContext batchContext) throws Exception {
43-
config.validate();
44+
FailureCollector collector = batchContext.getFailureCollector();
45+
config.validate(collector);
46+
collector.getOrThrowException();
4447

4548
if (!config.shouldRun(batchContext)) {
4649
return;
@@ -53,7 +56,8 @@ public void run(BatchActionContext batchContext) throws Exception {
5356

5457
@Override
5558
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
56-
config.validate();
59+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
60+
config.validate(collector);
5761
DBUtils.validateJDBCPluginPipeline(pipelineConfigurer, config, JDBC_PLUGIN_ID);
5862
}
5963
}

database-commons/src/main/java/io/cdap/plugin/db/batch/action/QueryActionConfig.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.cdap.cdap.api.annotation.Description;
2020
import io.cdap.cdap.api.annotation.Macro;
21+
import io.cdap.cdap.etl.api.FailureCollector;
2122
import io.cdap.cdap.etl.api.batch.BatchActionContext;
2223
import io.cdap.plugin.common.batch.action.Condition;
2324
import io.cdap.plugin.common.batch.action.ConditionConfig;
@@ -43,10 +44,14 @@ public QueryActionConfig() {
4344
runCondition = Condition.SUCCESS.name();
4445
}
4546

46-
public void validate() {
47+
public void validate(FailureCollector collector) {
4748
// have to delegate instead of inherit, since we can't extend both ConditionConfig and ConnectionConfig.
48-
if (!containsMacro("runCondition")) {
49-
new ConditionConfig(runCondition).validate();
49+
if (!containsMacro(RUN_CONDITION)) {
50+
try {
51+
new ConditionConfig(runCondition).validate();
52+
} catch (IllegalArgumentException e) {
53+
collector.addFailure(e.getMessage(), null).withConfigProperty(RUN_CONDITION);
54+
}
5055
}
5156
}
5257

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import io.cdap.cdap.api.dataset.lib.KeyValue;
2929
import io.cdap.cdap.api.plugin.PluginConfig;
3030
import io.cdap.cdap.etl.api.Emitter;
31+
import io.cdap.cdap.etl.api.FailureCollector;
3132
import io.cdap.cdap.etl.api.PipelineConfigurer;
33+
import io.cdap.cdap.etl.api.StageConfigurer;
3234
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3335
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
3436
import io.cdap.cdap.etl.api.validation.InvalidStageException;
@@ -64,6 +66,7 @@
6466
import java.util.Objects;
6567
import java.util.Optional;
6668
import java.util.Properties;
69+
import java.util.Set;
6770
import java.util.stream.Collectors;
6871

6972
/**
@@ -91,12 +94,16 @@ private String getJDBCPluginId() {
9194
@Override
9295
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
9396
super.configurePipeline(pipelineConfigurer);
97+
StageConfigurer configurer = pipelineConfigurer.getStageConfigurer();
9498
DBUtils.validateJDBCPluginPipeline(pipelineConfigurer, dbSinkConfig, getJDBCPluginId());
95-
Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
99+
Schema inputSchema = configurer.getInputSchema();
96100
if (Objects.nonNull(inputSchema)) {
97101
Class<? extends Driver> driverClass = DBUtils.getDriverClass(
98102
pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
99-
validateSchema(driverClass, dbSinkConfig.tableName, inputSchema);
103+
if (driverClass != null) {
104+
FailureCollector collector = configurer.getFailureCollector();
105+
validateSchema(collector, driverClass, dbSinkConfig.tableName, inputSchema);
106+
}
100107
}
101108
}
102109

@@ -117,7 +124,9 @@ public void prepareRun(BatchSinkContext context) {
117124
// make sure that the destination table exists and column types are correct
118125
try {
119126
if (Objects.nonNull(outputSchema)) {
120-
validateSchema(driverClass, dbSinkConfig.tableName, outputSchema);
127+
FailureCollector collector = context.getFailureCollector();
128+
validateSchema(collector, driverClass, dbSinkConfig.tableName, outputSchema);
129+
collector.getOrThrowException();
121130
} else {
122131
outputSchema = inferSchema(driverClass);
123132
}
@@ -269,15 +278,17 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMeta
269278
return columnTypes;
270279
}
271280

272-
private void validateSchema(Class<? extends Driver> jdbcDriverClass, String tableName, Schema inputSchema) {
281+
private void validateSchema(FailureCollector collector, Class<? extends Driver> jdbcDriverClass, String tableName,
282+
Schema inputSchema) {
273283
String connectionString = dbSinkConfig.getConnectionString();
274284

275285
try {
276286
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.jdbcPluginName);
277287
} catch (IllegalAccessException | InstantiationException | SQLException e) {
278-
throw new InvalidStageException(String.format("Unable to load or register JDBC driver '%s' while checking for " +
279-
"the existence of the database table '%s'.",
280-
jdbcDriverClass, tableName), e);
288+
collector.addFailure(String.format("Unable to load or register JDBC driver '%s' while checking for " +
289+
"the existence of the database table '%s'.",
290+
jdbcDriverClass, tableName), null).withStacktrace(e.getStackTrace());
291+
throw collector.getOrThrowException();
281292
}
282293

283294
Properties connectionProperties = new Properties();
@@ -286,23 +297,26 @@ private void validateSchema(Class<? extends Driver> jdbcDriverClass, String tabl
286297
executeInitQueries(connection, dbSinkConfig.getInitQueries());
287298
try (ResultSet tables = connection.getMetaData().getTables(null, null, tableName, null)) {
288299
if (!tables.next()) {
289-
throw new InvalidStageException("Table " + tableName + " does not exist. " +
290-
"Please check that the 'tableName' property has been set correctly, " +
291-
"and that the connection string " + connectionString +
292-
"points to a valid database.");
300+
collector.addFailure(
301+
String.format("Table '%s' does not exist.", tableName),
302+
String.format("Ensure table '%s' is set correctly and that the connection string '%s' points " +
303+
"to a valid database.", tableName, connectionString))
304+
.withConfigProperty(DBSinkConfig.TABLE_NAME);
305+
return;
293306
}
294307
}
295308

296309
try (PreparedStatement pStmt = connection.prepareStatement("SELECT * FROM " + dbSinkConfig.getEscapedTableName()
297310
+ " WHERE 1 = 0");
298311
ResultSet rs = pStmt.executeQuery()) {
299-
getFieldsValidator().validateFields(inputSchema, rs);
312+
getFieldsValidator().validateFields(inputSchema, rs, collector);
300313
}
301-
302314
} catch (SQLException e) {
303315
LOG.error("Exception while trying to validate schema of database table {} for connection {}.",
304316
tableName, connectionString, e);
305-
throw Throwables.propagate(e);
317+
collector.addFailure(
318+
String.format("Exception while trying to validate schema of database table '%s' for connection '%s'.",
319+
tableName, connectionString), null).withStacktrace(e.getStackTrace());
306320
}
307321
}
308322

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/CommonFieldsValidator.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818

1919
import com.google.common.base.Preconditions;
2020
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.cdap.cdap.etl.api.FailureCollector;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

2425
import java.sql.ResultSet;
2526
import java.sql.ResultSetMetaData;
2627
import java.sql.SQLException;
2728
import java.sql.Types;
28-
import java.util.HashSet;
29-
import java.util.Set;
3029

3130
/**
3231
* Common fields validator.
@@ -35,37 +34,29 @@ public class CommonFieldsValidator implements FieldsValidator {
3534
protected static final Logger LOG = LoggerFactory.getLogger(CommonFieldsValidator.class);
3635

3736
@Override
38-
public void validateFields(Schema inputSchema, ResultSet resultSet) throws SQLException {
37+
public void validateFields(Schema inputSchema, ResultSet resultSet, FailureCollector collector) throws SQLException {
3938
ResultSetMetaData rsMetaData = resultSet.getMetaData();
40-
4139
Preconditions.checkNotNull(inputSchema.getFields());
42-
Set<String> invalidFields = new HashSet<>();
4340
for (Schema.Field field : inputSchema.getFields()) {
4441
int columnIndex = resultSet.findColumn(field.getName());
4542
boolean isColumnNullable = (ResultSetMetaData.columnNullable == rsMetaData.isNullable(columnIndex));
4643
boolean isNotNullAssignable = !isColumnNullable && field.getSchema().isNullable();
44+
String name = field.getName();
4745
if (isNotNullAssignable) {
48-
LOG.error("Field '{}' was given as nullable but the database column is not nullable", field.getName());
49-
invalidFields.add(field.getName());
46+
collector.addFailure(
47+
String.format("Field '%s' was given as nullable but database column is not nullable.", name),
48+
"Ensure that the field is not nullable.").withInputSchemaField(name);
5049
}
5150

5251
if (!isFieldCompatible(field, rsMetaData, columnIndex)) {
5352
String sqlTypeName = rsMetaData.getColumnTypeName(columnIndex);
5453
Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
55-
Schema.Type fieldType = fieldSchema.getType();
56-
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
57-
LOG.error("Field '{}' was given as type '{}' but the database column is actually of type '{}'.",
58-
field.getName(),
59-
fieldLogicalType != null ? fieldLogicalType.getToken() : fieldType,
60-
sqlTypeName
61-
);
62-
invalidFields.add(field.getName());
54+
collector.addFailure(
55+
String.format("Field '%s' was given as type '%s' but the database column is actually of type '%s'.",
56+
name, fieldSchema.getDisplayName(), sqlTypeName),
57+
"Ensure that the field is not nullable.").withInputSchemaField(name);
6358
}
6459
}
65-
66-
Preconditions.checkArgument(invalidFields.isEmpty(),
67-
"Couldn't find matching database column(s) for input field(s) '%s'.",
68-
String.join(",", invalidFields));
6960
}
7061

7162
/**

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/FieldsValidator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package io.cdap.plugin.db.batch.sink;
1818

1919
import io.cdap.cdap.api.data.schema.Schema;
20+
import io.cdap.cdap.etl.api.FailureCollector;
2021

2122
import java.sql.ResultSet;
2223
import java.sql.ResultSetMetaData;
2324
import java.sql.SQLException;
25+
import java.util.Set;
2426

2527
/**
2628
* Main Interface to validate db fields.
@@ -33,7 +35,7 @@ public interface FieldsValidator {
3335
* @param inputSchema input schema.
3436
* @param resultSet resultSet with database fields.
3537
*/
36-
void validateFields(Schema inputSchema, ResultSet resultSet) throws SQLException;
38+
void validateFields(Schema inputSchema, ResultSet resultSet, FailureCollector collector) throws SQLException;
3739

3840
/**
3941
* Checks if field is compatible to be written into database column of the given sql index.

database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.db.batch.source;
1818

19+
import com.google.common.annotations.VisibleForTesting;
1920
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
@@ -26,7 +27,9 @@
2627
import io.cdap.cdap.api.dataset.lib.KeyValue;
2728
import io.cdap.cdap.api.plugin.PluginConfig;
2829
import io.cdap.cdap.etl.api.Emitter;
30+
import io.cdap.cdap.etl.api.FailureCollector;
2931
import io.cdap.cdap.etl.api.PipelineConfigurer;
32+
import io.cdap.cdap.etl.api.StageConfigurer;
3033
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3134
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
3235
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
@@ -102,16 +105,22 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
102105
Class<? extends Driver> driverClass = DBUtils.getDriverClass(
103106
pipelineConfigurer, sourceConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
104107

105-
sourceConfig.validate();
108+
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
109+
FailureCollector collector = stageConfigurer.getFailureCollector();
110+
sourceConfig.validate(collector);
106111
if (!Strings.isNullOrEmpty(sourceConfig.schema)) {
107-
pipelineConfigurer.getStageConfigurer().setOutputSchema(sourceConfig.getSchema());
112+
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
108113
} else if (sourceConfig.query != null) {
109114
try {
110-
pipelineConfigurer.getStageConfigurer().setOutputSchema(getSchema(driverClass));
115+
stageConfigurer.setOutputSchema(getSchema(driverClass));
111116
} catch (IllegalAccessException | InstantiationException e) {
112-
throw new InvalidStageException("Unable to instantiate JDBC driver: " + e.getMessage(), e);
117+
collector.addFailure("Unable to instantiate JDBC driver: " + e.getMessage(), null)
118+
.withStacktrace(e.getStackTrace());
113119
} catch (SQLException e) {
114-
throw new IllegalArgumentException("SQL error while getting query schema: " + e.getMessage(), e);
120+
collector.addFailure("SQL error while getting query schema: " + e.getMessage(), null)
121+
.withStacktrace(e.getStackTrace());
122+
} catch (Exception e) {
123+
collector.addFailure(e.getMessage(), null).withStacktrace(e.getStackTrace());
115124
}
116125
}
117126
}
@@ -205,7 +214,9 @@ private Connection getConnection() throws SQLException {
205214

206215
@Override
207216
public void prepareRun(BatchSourceContext context) throws Exception {
208-
sourceConfig.validate();
217+
FailureCollector collector = context.getFailureCollector();
218+
sourceConfig.validate(collector);
219+
collector.getOrThrowException();
209220

210221
String connectionString = sourceConfig.getConnectionString();
211222

@@ -349,43 +360,48 @@ private String getBoundingQuery() {
349360
return cleanQuery(boundingQuery);
350361
}
351362

352-
private void validate() {
363+
private void validate(FailureCollector collector) {
353364
boolean hasOneSplit = false;
354-
if (!containsMacro("numSplits") && numSplits != null) {
365+
if (!containsMacro(NUM_SPLITS) && numSplits != null) {
355366
if (numSplits < 1) {
356-
throw new IllegalArgumentException(
357-
"Invalid value for numSplits. Must be at least 1, but got " + numSplits);
367+
collector.addFailure(
368+
String.format("Invalid value for numSplits '%d'. Must be at least 1.", numSplits), null)
369+
.withConfigProperty(NUM_SPLITS);
358370
}
359371
if (numSplits == 1) {
360372
hasOneSplit = true;
361373
}
362374
}
363375

364376
if (getTransactionIsolationLevel() != null) {
365-
TransactionIsolationLevel.validate(getTransactionIsolationLevel());
377+
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
366378
}
367379

368380
if (!hasOneSplit && !containsMacro("importQuery") && !getImportQuery().contains("$CONDITIONS")) {
369-
throw new IllegalArgumentException(String.format("Import Query %s must contain the string '$CONDITIONS'.",
370-
importQuery));
381+
collector.addFailure("Invalid Import Query.",
382+
String.format("Import Query %s must contain the string '$CONDITIONS'.", importQuery))
383+
.withConfigProperty(IMPORT_QUERY);
371384
}
372385

373386
if (!hasOneSplit && !containsMacro("splitBy") && (splitBy == null || splitBy.isEmpty())) {
374-
throw new IllegalArgumentException("The splitBy must be specified if numSplits is not set to 1.");
387+
collector.addFailure("Split-By Field Name must be specified if Number of Splits is not set to 1.",
388+
null).withConfigProperty(SPLIT_BY).withConfigProperty(NUM_SPLITS);
375389
}
376390

377391
if (!hasOneSplit && !containsMacro("boundingQuery") && (boundingQuery == null || boundingQuery.isEmpty())) {
378-
throw new IllegalArgumentException("The boundingQuery must be specified if numSplits is not set to 1.");
392+
collector.addFailure("Bounding Query must be specified if Number of Splits is not set to 1.", null)
393+
.withConfigProperty(BOUNDING_QUERY).withConfigProperty(NUM_SPLITS);
379394
}
380-
381395
}
382396

383397
private void validateSchema(Schema actualSchema) {
384398
validateSchema(actualSchema, getSchema());
385399
}
386400

401+
@VisibleForTesting
387402
static void validateSchema(Schema actualSchema, Schema configSchema) {
388403
if (configSchema == null) {
404+
389405
throw new InvalidConfigPropertyException("Schema should not be null or empty", SCHEMA);
390406
}
391407
for (Schema.Field field : configSchema.getFields()) {

0 commit comments

Comments
 (0)