Skip to content

Commit 725d8a2

Browse files
CDAP-15658 fixed tests for generic databases + added tests for schemas and fields validation
1 parent 60e6554 commit 725d8a2

File tree

7 files changed

+231
-227
lines changed

7 files changed

+231
-227
lines changed

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.db.batch.sink;
1818

19+
import com.google.common.base.Preconditions;
1920
import com.google.common.base.Strings;
2021
import com.google.common.base.Throwables;
2122
import io.cdap.cdap.api.annotation.Description;
@@ -238,13 +239,36 @@ private void setResultSetMetadata() throws Exception {
238239
dbSinkConfig.getEscapedTableName()))
239240
) {
240241
ResultSetMetaData resultSetMetadata = rs.getMetaData();
241-
columnTypes.addAll(DBUtils.getMatchedColumnTypeList(resultSetMetadata, columns));
242+
columnTypes.addAll(getMatchedColumnTypeList(resultSetMetadata, columns));
242243
}
243244
}
244245

245246
this.columnTypes = Collections.unmodifiableList(columnTypes);
246247
}
247248

249+
/**
250+
* Compare columns from schema with columns in table and returns list of matched columns in {@link ColumnType} format.
251+
*
252+
* @param resultSetMetadata result set metadata from table.
253+
* @param columns list of columns from schema.
254+
* @return list of matched columns.
255+
*/
256+
static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMetadata, List<String> columns)
257+
throws SQLException {
258+
List<ColumnType> columnTypes = new ArrayList<>(columns.size());
259+
// JDBC driver column indices start with 1
260+
for (int i = 0; i < resultSetMetadata.getColumnCount(); i++) {
261+
String name = resultSetMetadata.getColumnName(i + 1);
262+
String columnTypeName = resultSetMetadata.getColumnTypeName(i + 1);
263+
int type = resultSetMetadata.getColumnType(i + 1);
264+
String schemaColumnName = columns.get(i);
265+
Preconditions.checkArgument(schemaColumnName.toLowerCase().equals(name.toLowerCase()),
266+
"Missing column '%s' in SQL table", schemaColumnName);
267+
columnTypes.add(new ColumnType(schemaColumnName, columnTypeName, type));
268+
}
269+
return columnTypes;
270+
}
271+
248272
private void validateSchema(Class<? extends Driver> jdbcDriverClass, String tableName, Schema inputSchema) {
249273
String connectionString = dbSinkConfig.getConnectionString();
250274

database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.cdap.cdap.etl.api.PipelineConfigurer;
3030
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3131
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
32+
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
3233
import io.cdap.cdap.etl.api.validation.InvalidStageException;
3334
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
3435
import io.cdap.plugin.common.LineageRecorder;
@@ -84,7 +85,7 @@ private static String removeConditionsClause(String importQueryString) {
8485
if (importQueryString.toUpperCase().contains("WHERE $CONDITIONS AND")) {
8586
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("$CONDITIONS AND"), "");
8687
} else if (importQueryString.toUpperCase().contains("WHERE $CONDITIONS")) {
87-
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("WHERE $CONDITIONS"), "");
88+
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("WHERE $CONDITIONS"), "");
8889
} else if (importQueryString.toUpperCase().contains("AND $CONDITIONS")) {
8990
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("AND $CONDITIONS"), "");
9091
} else if (importQueryString.toUpperCase().contains("$CONDITIONS")) {
@@ -380,7 +381,30 @@ private void validate() {
380381
}
381382

382383
private void validateSchema(Schema actualSchema) {
383-
DBUtils.validateSourceSchema(actualSchema, getSchema());
384+
validateSchema(actualSchema, getSchema());
385+
}
386+
387+
static void validateSchema(Schema actualSchema, Schema configSchema) {
388+
if (configSchema == null) {
389+
throw new InvalidConfigPropertyException("Schema should not be null or empty", SCHEMA);
390+
}
391+
for (Schema.Field field : configSchema.getFields()) {
392+
Schema.Field actualField = actualSchema.getField(field.getName());
393+
if (actualField == null) {
394+
throw new InvalidConfigPropertyException(String.format("Schema field '%s' is not present in actual record",
395+
field.getName()), SCHEMA);
396+
}
397+
Schema actualFieldSchema = actualField.getSchema().isNullable() ?
398+
actualField.getSchema().getNonNullable() : actualField.getSchema();
399+
Schema expectedFieldSchema = field.getSchema().isNullable() ?
400+
field.getSchema().getNonNullable() : field.getSchema();
401+
402+
if (!actualFieldSchema.equals(expectedFieldSchema)) {
403+
throw new IllegalArgumentException(
404+
String.format("Schema field '%s' has type '%s' but found '%s' in input record",
405+
field.getName(), expectedFieldSchema.getType(), actualFieldSchema.getType()));
406+
}
407+
}
384408
}
385409

386410
@Nullable

database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717
package io.cdap.plugin.util;
1818

1919
import com.google.common.base.Preconditions;
20-
import io.cdap.cdap.api.data.schema.Schema;
2120
import io.cdap.cdap.api.plugin.PluginProperties;
2221
import io.cdap.cdap.etl.api.PipelineConfigurer;
2322
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
24-
import io.cdap.plugin.db.ColumnType;
2523
import io.cdap.plugin.db.ConnectionConfig;
2624
import io.cdap.plugin.db.JDBCDriverShim;
27-
import io.cdap.plugin.db.batch.source.AbstractDBSource;
2825
import org.slf4j.Logger;
2926
import org.slf4j.LoggerFactory;
3027

@@ -37,10 +34,8 @@
3734
import java.sql.Driver;
3835
import java.sql.DriverManager;
3936
import java.sql.ResultSet;
40-
import java.sql.ResultSetMetaData;
4137
import java.sql.SQLException;
4238
import java.sql.Types;
43-
import java.util.ArrayList;
4439
import java.util.Hashtable;
4540
import java.util.List;
4641
import java.util.Objects;
@@ -189,60 +184,6 @@ public static Class<? extends Driver> getDriverClass(PipelineConfigurer pipeline
189184
jdbcPluginId, PluginProperties.builder().build());
190185
}
191186

192-
/**
193-
* Checks if fields from schema are compatible to be written into database.
194-
*
195-
* @param actualSchema schema from db.
196-
* @param configSchema schema from config.
197-
*/
198-
public static void validateSourceSchema(Schema actualSchema, Schema configSchema) {
199-
if (configSchema == null) {
200-
throw new InvalidConfigPropertyException("Schema should not be null or empty",
201-
AbstractDBSource.DBSourceConfig.SCHEMA);
202-
}
203-
for (Schema.Field field : configSchema.getFields()) {
204-
Schema.Field actualField = actualSchema.getField(field.getName());
205-
if (actualField == null) {
206-
throw new InvalidConfigPropertyException(String.format("Schema field '%s' is not present in actual record",
207-
field.getName()),
208-
AbstractDBSource.DBSourceConfig.SCHEMA);
209-
}
210-
Schema actualFieldSchema = actualField.getSchema().isNullable() ?
211-
actualField.getSchema().getNonNullable() : actualField.getSchema();
212-
Schema expectedFieldSchema = field.getSchema().isNullable() ?
213-
field.getSchema().getNonNullable() : field.getSchema();
214-
215-
if (!actualFieldSchema.equals(expectedFieldSchema)) {
216-
throw new IllegalArgumentException(
217-
String.format("Schema field '%s' has type '%s' but found '%s' in input record",
218-
field.getName(), expectedFieldSchema.getType(), actualFieldSchema.getType()));
219-
}
220-
}
221-
}
222-
223-
/**
224-
* Compare columns from schema with columns in table and returns list of matched columns in {@link ColumnType} format.
225-
*
226-
* @param resultSetMetadata result set metadata from table.
227-
* @param columns list of columns from schema.
228-
* @return list of matched columns.
229-
*/
230-
public static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMetadata, List<String> columns)
231-
throws SQLException {
232-
List<ColumnType> columnTypes = new ArrayList<>(columns.size());
233-
// JDBC driver column indices start with 1
234-
for (int i = 0; i < resultSetMetadata.getColumnCount(); i++) {
235-
String name = resultSetMetadata.getColumnName(i + 1);
236-
String columnTypeName = resultSetMetadata.getColumnTypeName(i + 1);
237-
int type = resultSetMetadata.getColumnType(i + 1);
238-
String schemaColumnName = columns.get(i);
239-
Preconditions.checkArgument(schemaColumnName.toLowerCase().equals(name.toLowerCase()),
240-
"Missing column '%s' in SQL table", schemaColumnName);
241-
columnTypes.add(new ColumnType(schemaColumnName, columnTypeName, type));
242-
}
243-
return columnTypes;
244-
}
245-
246187
/**
247188
* Shuts down a cleanup thread com.mysql.jdbc.AbandonedConnectionCleanupThread that mysql driver fails to destroy
248189
* If this is not done, the thread keeps a reference to the classloader, thereby causing OOMs or too many open files

database-commons/src/test/java/io/cdap/plugin/db/DBUtilsTest.java

Lines changed: 0 additions & 162 deletions
This file was deleted.

0 commit comments

Comments
 (0)