Skip to content

Commit 2d94f88

Browse files
CDAP-15518 input schema validation - added input schema validation with test coverage and tests for attempts to write invalid data type
1 parent 78d5594 commit 2d94f88

File tree

11 files changed

+535
-104
lines changed

11 files changed

+535
-104
lines changed

aurora-mysql-plugin/src/test/java/io/cdap/plugin/aurora/mysql/AuroraMysqlSinkTestRun.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.cdap.plugin.common.Constants;
3232
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
3333
import org.junit.Assert;
34+
import org.junit.Before;
3435
import org.junit.Test;
3536

3637
import java.math.BigDecimal;
@@ -54,20 +55,41 @@
5455
*/
5556
public class AuroraMysqlSinkTestRun extends AuroraMysqlPluginTestBase {
5657

58+
@Before
59+
public void setup() throws Exception {
60+
try (Connection connection = createConnection();
61+
Statement stmt = connection.createStatement()) {
62+
stmt.execute("TRUNCATE TABLE MY_DEST_TABLE");
63+
}
64+
}
65+
66+
@Test
67+
public void testDBSinkWithInvalidFieldType() throws Exception {
68+
testDBInvalidFieldType("ID", Schema.Type.STRING, getSinkConfig(), DATAPIPELINE_ARTIFACT);
69+
}
70+
71+
@Test
72+
public void testDBSinkWithInvalidFieldLogicalType() throws Exception {
73+
testDBInvalidFieldLogicalType("TIMESTAMP_COL", Schema.Type.LONG, getSinkConfig(), DATAPIPELINE_ARTIFACT);
74+
}
75+
76+
@Test
77+
public void testDBSinkWithDBSchemaAndInvalidData() throws Exception {
78+
String stringColumnName = "NAME";
79+
startPipelineAndWriteInvalidData(stringColumnName, getSinkConfig(), DATAPIPELINE_ARTIFACT);
80+
try (Connection conn = createConnection();
81+
Statement stmt = conn.createStatement();
82+
ResultSet resultSet = stmt.executeQuery("SELECT * FROM MY_DEST_TABLE")) {
83+
testInvalidDataWrite(resultSet, stringColumnName);
84+
}
85+
}
86+
5787
@Test
5888
public void testDBSink() throws Exception {
5989
String inputDatasetName = "input-dbsinktest";
6090

6191
ETLPlugin sourceConfig = MockSource.getPlugin(inputDatasetName);
62-
ETLPlugin sinkConfig = new ETLPlugin(
63-
AuroraMysqlConstants.PLUGIN_NAME,
64-
BatchSink.PLUGIN_TYPE,
65-
ImmutableMap.<String, String>builder()
66-
.putAll(BASE_PROPS)
67-
.put(AbstractDBSink.DBSinkConfig.TABLE_NAME, "MY_DEST_TABLE")
68-
.put(Constants.Reference.REFERENCE_NAME, "DBTest")
69-
.build(),
70-
null);
92+
ETLPlugin sinkConfig = getSinkConfig();
7193

7294
deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, "testDBSink");
7395
createInputData(inputDatasetName);
@@ -159,4 +181,16 @@ private void createInputData(String inputDatasetName) throws Exception {
159181
}
160182
MockSource.writeInput(inputManager, inputRecords);
161183
}
184+
185+
private ETLPlugin getSinkConfig() {
186+
return new ETLPlugin(
187+
AuroraMysqlConstants.PLUGIN_NAME,
188+
BatchSink.PLUGIN_TYPE,
189+
ImmutableMap.<String, String>builder()
190+
.putAll(BASE_PROPS)
191+
.put(AbstractDBSink.DBSinkConfig.TABLE_NAME, "MY_DEST_TABLE")
192+
.put(Constants.Reference.REFERENCE_NAME, "DBTest")
193+
.build(),
194+
null);
195+
}
162196
}

aurora-postgresql-plugin/src/test/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSinkTestRun.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.cdap.plugin.common.Constants;
2929
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
3030
import org.junit.Assert;
31+
import org.junit.Before;
3132
import org.junit.Test;
3233

3334
import java.math.BigDecimal;
@@ -51,21 +52,41 @@
5152
*/
5253
public class AuroraPostgresSinkTestRun extends AuroraPostgresPluginTestBase {
5354

55+
@Before
56+
public void setup() throws Exception {
57+
try (Connection connection = createConnection();
58+
Statement stmt = connection.createStatement()) {
59+
stmt.execute("TRUNCATE TABLE \"MY_DEST_TABLE\"");
60+
}
61+
}
62+
63+
@Test
64+
public void testDBSinkWithInvalidFieldType() throws Exception {
65+
testDBInvalidFieldType("ID", Schema.Type.STRING, getSinkConfig(), DATAPIPELINE_ARTIFACT);
66+
}
67+
68+
@Test
69+
public void testDBSinkWithInvalidFieldLogicalType() throws Exception {
70+
testDBInvalidFieldLogicalType("TIMESTAMP_COL", Schema.Type.LONG, getSinkConfig(), DATAPIPELINE_ARTIFACT);
71+
}
72+
73+
@Test
74+
public void testDBSinkWithDBSchemaAndInvalidData() throws Exception {
75+
String stringColumnName = "NAME";
76+
startPipelineAndWriteInvalidData(stringColumnName, getSinkConfig(), DATAPIPELINE_ARTIFACT);
77+
try (Connection conn = createConnection();
78+
Statement stmt = conn.createStatement();
79+
ResultSet resultSet = stmt.executeQuery("SELECT * FROM \"MY_DEST_TABLE\"")) {
80+
testInvalidDataWrite(resultSet, stringColumnName);
81+
}
82+
}
83+
5484
@Test
5585
public void testDBSink() throws Exception {
5686
String inputDatasetName = "input-dbsinktest";
5787

5888
ETLPlugin sourceConfig = MockSource.getPlugin(inputDatasetName);
59-
ETLPlugin sinkConfig = new ETLPlugin(
60-
AuroraPostgresConstants.PLUGIN_NAME,
61-
BatchSink.PLUGIN_TYPE,
62-
ImmutableMap.<String, String>builder()
63-
.putAll(BASE_PROPS)
64-
.put(AbstractDBSink.DBSinkConfig.TABLE_NAME, "MY_DEST_TABLE")
65-
.put(Constants.Reference.REFERENCE_NAME, "DBTest")
66-
.build(),
67-
null);
68-
89+
ETLPlugin sinkConfig = getSinkConfig();
6990
deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, "testDBSink");
7091
createInputData(inputDatasetName);
7192

@@ -136,4 +157,17 @@ private void createInputData(String inputDatasetName) throws Exception {
136157
}
137158
MockSource.writeInput(inputManager, inputRecords);
138159
}
160+
161+
private ETLPlugin getSinkConfig() {
162+
return new ETLPlugin(
163+
AuroraPostgresConstants.PLUGIN_NAME,
164+
BatchSink.PLUGIN_TYPE,
165+
ImmutableMap.<String, String>builder()
166+
.putAll(BASE_PROPS)
167+
.put(AbstractDBSink.DBSinkConfig.TABLE_NAME, "MY_DEST_TABLE")
168+
.put(Constants.Reference.REFERENCE_NAME, "DBTest")
169+
.build(),
170+
null);
171+
172+
}
139173
}

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ private String getJDBCPluginId() {
9898
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
9999
super.configurePipeline(pipelineConfigurer);
100100
DBUtils.validateJDBCPluginPipeline(pipelineConfigurer, dbSinkConfig, getJDBCPluginId());
101+
Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
102+
if (Objects.nonNull(inputSchema)) {
103+
Class<? extends Driver> driverClass = DBUtils.getDriverClass(
104+
pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
105+
validateSchema(driverClass, dbSinkConfig.tableName, inputSchema);
106+
}
101107
}
102108

103109
@Override
@@ -116,7 +122,7 @@ public void prepareRun(BatchSinkContext context) {
116122
Class<? extends Driver> driverClass = context.loadPluginClass(getJDBCPluginId());
117123
// make sure that the destination table exists and column types are correct
118124
try {
119-
if (Objects.nonNull(context.getInputSchema())) {
125+
if (Objects.nonNull(outputSchema)) {
120126
validateSchema(driverClass, dbSinkConfig.tableName, outputSchema);
121127
} else {
122128
outputSchema = inferSchema(driverClass);
@@ -280,9 +286,9 @@ private void validateSchema(Class<? extends Driver> jdbcDriverClass, String tabl
280286
try {
281287
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.jdbcPluginName);
282288
} catch (IllegalAccessException | InstantiationException | SQLException e) {
283-
LOG.error("Unable to load or register JDBC driver {} while checking for the existence of the database table {}.",
284-
jdbcDriverClass, tableName, e);
285-
throw Throwables.propagate(e);
289+
throw new InvalidStageException(String.format("Unable to load or register JDBC driver '%s' while checking for " +
290+
"the existence of the database table '%s'.",
291+
jdbcDriverClass, tableName), e);
286292
}
287293

288294
Properties connectionProperties = new Properties();
@@ -327,15 +333,23 @@ private void validateFields(Schema inputSchema, ResultSet rs) throws SQLExceptio
327333
boolean isColumnNullable = (ResultSetMetaData.columnNullable == rsMetaData.isNullable(columnIndex));
328334

329335
Schema.Type columnType = columnSchema.getType();
330-
Schema.Type fieldType = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType()
331-
: field.getSchema().getType();
336+
Schema.LogicalType columnLogicalType = columnSchema.getLogicalType();
337+
338+
Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
339+
Schema.Type fieldType = fieldSchema.getType();
340+
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
341+
332342
boolean isNotNullAssignable = !isColumnNullable && field.getSchema().isNullable();
333-
boolean isNotCompatible = !Objects.equals(fieldType, columnType);
343+
boolean isNotCompatible = !(Objects.equals(fieldType, columnType)
344+
&& Objects.equals(fieldLogicalType, columnLogicalType));
334345

335346
if (isNotCompatible) {
336347
invalidFields.add(field.getName());
337-
LOG.error("Field {} was given as type {} but the database column is actually of type {}.", field.getName(),
338-
fieldType, columnSchema.getType());
348+
LOG.error("Field {} was given as type {} but the database column is actually of type {}.",
349+
field.getName(),
350+
fieldLogicalType != null ? fieldLogicalType.getToken() : fieldType,
351+
columnLogicalType != null ? columnLogicalType.getToken() : columnType
352+
);
339353
}
340354
if (isNotNullAssignable) {
341355
invalidFields.add(field.getName());

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/ETLDBOutputFormat.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,15 @@ public void close(TaskAttemptContext context) throws IOException {
110110
}
111111

112112
@Override
113-
public void write(K key, V value) throws IOException {
114-
super.write(key, value);
113+
public void write(K key, V value) {
115114
emptyData = false;
115+
//We need to make correct logging to avoid losing information about error
116+
try {
117+
key.write(getStatement());
118+
getStatement().addBatch();
119+
} catch (SQLException e) {
120+
LOG.warn("Failed to write value to database", e);
121+
}
116122
}
117123
};
118124
} catch (Exception ex) {

database-commons/src/test/java/io/cdap/plugin/db/batch/DatabasePluginTestBase.java

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818

1919
import com.google.common.collect.ImmutableMap;
2020
import io.cdap.cdap.api.artifact.ArtifactSummary;
21+
import io.cdap.cdap.api.data.format.StructuredRecord;
22+
import io.cdap.cdap.api.data.schema.Schema;
23+
import io.cdap.cdap.api.dataset.table.Table;
2124
import io.cdap.cdap.datapipeline.SmartWorkflow;
25+
import io.cdap.cdap.etl.mock.batch.MockSource;
2226
import io.cdap.cdap.etl.mock.test.HydratorTestBase;
2327
import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
2428
import io.cdap.cdap.etl.proto.v2.ETLPlugin;
@@ -28,9 +32,14 @@
2832
import io.cdap.cdap.proto.id.ApplicationId;
2933
import io.cdap.cdap.proto.id.NamespaceId;
3034
import io.cdap.cdap.test.ApplicationManager;
35+
import io.cdap.cdap.test.DataSetManager;
3136
import io.cdap.cdap.test.WorkflowManager;
3237
import org.junit.Assert;
3338

39+
import java.sql.ResultSet;
40+
import java.sql.SQLException;
41+
import java.util.ArrayList;
42+
import java.util.List;
3443
import java.util.Map;
3544
import java.util.concurrent.ExecutionException;
3645
import java.util.concurrent.TimeUnit;
@@ -41,6 +50,13 @@
4150
*/
4251
public class DatabasePluginTestBase extends HydratorTestBase {
4352

53+
public static Schema getSchemaWithInvalidTypeMapping(String columnName, Schema.Type type) {
54+
return Schema.recordOf(
55+
"wrongDBRecord",
56+
Schema.Field.of(columnName, Schema.of(type))
57+
);
58+
}
59+
4460
protected static void assertDeploymentFailure(ApplicationId appId, ETLBatchConfig etlConfig,
4561
ArtifactSummary datapipelineArtifact, String failureMessage)
4662
throws Exception {
@@ -66,17 +82,20 @@ protected static void assertRuntimeFailure(ApplicationId appId, ETLBatchConfig e
6682
protected ApplicationManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin,
6783
ArtifactSummary datapipelineArtifact, String appName)
6884
throws Exception {
85+
ETLBatchConfig etlConfig = getETLBatchConfig(sourcePlugin, sinkPlugin);
86+
AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(datapipelineArtifact, etlConfig);
87+
ApplicationId appId = NamespaceId.DEFAULT.app(appName);
88+
return deployApplication(appId, appRequest);
89+
}
90+
91+
protected ETLBatchConfig getETLBatchConfig(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin) {
6992
ETLStage source = new ETLStage("source", sourcePlugin);
7093
ETLStage sink = new ETLStage("sink", sinkPlugin);
71-
ETLBatchConfig etlConfig = ETLBatchConfig.builder()
94+
return ETLBatchConfig.builder()
7295
.addStage(source)
7396
.addStage(sink)
7497
.addConnection(source.getName(), sink.getName())
7598
.build();
76-
77-
AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(datapipelineArtifact, etlConfig);
78-
ApplicationId appId = NamespaceId.DEFAULT.app(appName);
79-
return deployApplication(appId, appRequest);
8099
}
81100

82101
protected void runETLOnce(ApplicationManager appManager) throws TimeoutException,
@@ -91,4 +110,80 @@ protected void runETLOnce(ApplicationManager appManager,
91110
workflowManager.start(arguments);
92111
workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES);
93112
}
113+
114+
protected void testDBInvalidFieldType(String columnName, Schema.Type type, ETLPlugin sinkConfig,
115+
ArtifactSummary datapipelineArtifact) throws Exception {
116+
String inputDatasetName = "input-dbsinktest-invalid-field-type";
117+
Schema schema = getSchemaWithInvalidTypeMapping(columnName, type);
118+
testDBSinkValidation(inputDatasetName, "testDBSinkWithInvalidFieldType", schema, datapipelineArtifact,
119+
sinkConfig);
120+
}
121+
122+
protected void testDBInvalidFieldLogicalType(String columnName, Schema.Type type, ETLPlugin sinkConfig,
123+
ArtifactSummary datapipelineArtifact) throws Exception {
124+
String inputDatasetName = "input-dbsinktest-invalid-field-logical-type";
125+
Schema schema = getSchemaWithInvalidTypeMapping(columnName, type);
126+
testDBSinkValidation(inputDatasetName, "testDBSinkWithInvalidFieldLogicalType", schema,
127+
datapipelineArtifact, sinkConfig);
128+
}
129+
130+
protected void testDBSinkValidation(String inputDatasetName, String appName, Schema schema,
131+
ArtifactSummary datapipelineArtifact, ETLPlugin sinkConfig) throws Exception {
132+
ETLPlugin sourceConfig = MockSource.getPlugin(inputDatasetName, schema);
133+
ETLBatchConfig etlConfig = getETLBatchConfig(sourceConfig, sinkConfig);
134+
ApplicationId appId = NamespaceId.DEFAULT.app(appName);
135+
assertDeploymentFailure(appId, etlConfig, datapipelineArtifact, "No fail message on schema validation");
136+
}
137+
138+
protected void writeDataForInvalidDataWriteTest(String inputDatasetName, String stringColumnName) throws Exception {
139+
Schema validSchema = Schema.recordOf(
140+
"wrongDBRecord",
141+
Schema.Field.of("ID", Schema.of(Schema.Type.INT)),
142+
Schema.Field.of(stringColumnName, Schema.of(Schema.Type.STRING))
143+
);
144+
145+
Schema invalidSchema = Schema.recordOf(
146+
"wrongDBRecord",
147+
Schema.Field.of("ID", Schema.of(Schema.Type.INT)),
148+
Schema.Field.of(stringColumnName, Schema.of(Schema.Type.INT))
149+
);
150+
151+
// add some data to the input table
152+
DataSetManager<Table> inputManager = getDataset(inputDatasetName);
153+
154+
List<StructuredRecord> inputRecords = new ArrayList<>();
155+
inputRecords.add(StructuredRecord.builder(validSchema)
156+
.set("ID", 1)
157+
.set(stringColumnName, "user1")
158+
.build());
159+
inputRecords.add(StructuredRecord.builder(invalidSchema)
160+
.set("ID", 2)
161+
.set(stringColumnName, 1)
162+
.build());
163+
inputRecords.add(StructuredRecord.builder(validSchema)
164+
.set("ID", 3)
165+
.set(stringColumnName, "user3")
166+
.build());
167+
MockSource.writeInput(inputManager, inputRecords);
168+
}
169+
170+
protected void startPipelineAndWriteInvalidData(String stringColumnName, ETLPlugin sinkConfig,
171+
ArtifactSummary datapipelineArtifact) throws Exception {
172+
String inputDatasetName = "input-dbsinktest-db-schema-invalid-schema-mapping";
173+
ETLPlugin sourceConfig = MockSource.getPlugin(inputDatasetName);
174+
ApplicationManager applicationManager = deployETL(sourceConfig, sinkConfig, datapipelineArtifact,
175+
"testDBSinkWithDBSchemaAndInvalidSchemaMapping");
176+
177+
writeDataForInvalidDataWriteTest(inputDatasetName, stringColumnName);
178+
WorkflowManager workflowManager = applicationManager.getWorkflowManager(SmartWorkflow.NAME);
179+
workflowManager.startAndWaitForRun(ProgramRunStatus.FAILED, 5, TimeUnit.MINUTES);
180+
}
181+
182+
protected void testInvalidDataWrite(ResultSet resultSet, String columnName) throws SQLException {
183+
List<String> users = new ArrayList<>();
184+
while (resultSet.next()) {
185+
users.add(resultSet.getString(columnName).trim());
186+
}
187+
Assert.assertFalse(users.contains("1"));
188+
}
94189
}

0 commit comments

Comments
 (0)