Skip to content

Commit a72dd1c

Browse files
committed
CDAP-15551 DB2 db plugin enhancements: all data types support + proper test coverage - map DECFLOAT to string.
1 parent 75d5172 commit a72dd1c

File tree

6 files changed

+94
-30
lines changed

6 files changed

+94
-30
lines changed

db2-plugin/docs/Db2-batchsource.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,29 @@ non-nullable varchars, output records will have this schema:
7878
| name | string |
7979
| email | string |
8080
| phone | string |
81+
82+
Data Types Mapping
83+
------
84+
| DB2 Data Type | CDAP Schema Data Type | Comment |
85+
|------------------------------|-----------------------|---------|
86+
| SMALLINT | int | |
87+
| INTEGER | int | |
88+
| BIGINT | long | |
89+
| DECIMAL(p,s) or NUMERIC(p,s) | decimal | |
90+
| DECFLOAT | string | |
91+
| REAL | float | |
92+
| DOUBLE | double | |
93+
| CHAR | string | |
94+
| VARCHAR | string | |
95+
| CHAR(n) FOR BIT DATA | bytes | |
96+
| VARCHAR(n) FOR BIT DATA | bytes | |
97+
| BINARY | bytes | |
98+
| VARBINARY | bytes | |
99+
| GRAPHIC | string | |
100+
| VARGRAPHIC | string | |
101+
| CLOB | string | |
102+
| BLOB | bytes | |
103+
| DBCLOB | string | |
104+
| DATE | date | |
105+
| TIME | time | |
106+
| TIMESTAMP | timestamp | |

db2-plugin/src/main/java/io/cdap/plugin/db2/DB2Record.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private void handleSpecificType(ResultSet resultSet,
6565
String columnTypeName = metaData.getColumnTypeName(columnIndex);
6666

6767
if (DB2SchemaReader.DB2_DECFLOAT.equals(columnTypeName)) {
68-
recordBuilder.set(field.getName(), resultSet.getBigDecimal(columnIndex).doubleValue());
68+
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
6969
}
7070
}
7171
}

db2-plugin/src/main/java/io/cdap/plugin/db2/DB2SchemaReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DB2SchemaReader extends CommonSchemaReader {
3737
@Override
3838
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
3939
if (DB2_TYPES.contains(metadata.getColumnType(index)) && DB2_DECFLOAT.equals(metadata.getColumnTypeName(index))) {
40-
return Schema.of(Schema.Type.DOUBLE);
40+
return Schema.of(Schema.Type.STRING);
4141
} else {
4242
return super.getSchema(metadata, index);
4343
}

db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Sink.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,18 @@
2020
import io.cdap.cdap.api.annotation.Name;
2121
import io.cdap.cdap.api.annotation.Plugin;
2222
import io.cdap.cdap.api.data.format.StructuredRecord;
23+
import io.cdap.cdap.api.data.schema.Schema;
2324
import io.cdap.cdap.etl.api.batch.BatchSink;
2425
import io.cdap.plugin.db.DBRecord;
2526
import io.cdap.plugin.db.SchemaReader;
2627
import io.cdap.plugin.db.batch.config.DBSpecificSinkConfig;
2728
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.sql.ResultSetMetaData;
33+
import java.sql.SQLException;
34+
import java.util.Objects;
2835

2936

3037
/**
@@ -34,6 +41,7 @@
3441
@Name(Db2Constants.PLUGIN_NAME)
3542
@Description("Writes records to a DB2 table. Each record will be written in a row in the table.")
3643
public class Db2Sink extends AbstractDBSink {
44+
private static final Logger LOG = LoggerFactory.getLogger(Db2Sink.class);
3745

3846
private final Db2SinkConfig db2SinkConfig;
3947

@@ -61,4 +69,24 @@ protected DBRecord getDBRecord(StructuredRecord.Builder output) {
6169
protected SchemaReader getSchemaReader() {
6270
return new DB2SchemaReader();
6371
}
72+
73+
@Override
74+
protected boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException {
75+
Schema.Type fieldType = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType()
76+
: field.getSchema().getType();
77+
78+
//DECFLOAT is mapped to string
79+
String colTypeName = metadata.getColumnTypeName(index);
80+
if (DB2SchemaReader.DB2_DECFLOAT.equals(colTypeName)) {
81+
if (Objects.equals(fieldType, Schema.Type.STRING)) {
82+
return true;
83+
} else {
84+
LOG.error("Field '{}' was given as type '{}' but must be of type 'string' for the DB2 column of " +
85+
"DECFLOAT type.", field.getName(), fieldType);
86+
return false;
87+
}
88+
}
89+
90+
return super.isFieldCompatible(field, metadata, index);
91+
}
6492
}

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2SinkTestRun.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,29 +53,29 @@
5353
*/
5454
public class Db2SinkTestRun extends Db2PluginTestBase {
5555
private static final Schema SCHEMA = Schema.recordOf(
56-
"dbRecord",
57-
Schema.Field.of("SMALLINT_COL", Schema.of(Schema.Type.INT)),
58-
Schema.Field.of("INTEGER_COL", Schema.of(Schema.Type.INT)),
59-
Schema.Field.of("BIGINT_COL", Schema.of(Schema.Type.LONG)),
60-
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(10, 6)),
61-
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(10, 6)),
62-
Schema.Field.of("DECFLOAT_COL", Schema.of(Schema.Type.DOUBLE)),
63-
Schema.Field.of("REAL_COL", Schema.of(Schema.Type.FLOAT)),
64-
Schema.Field.of("DOUBLE_COL", Schema.of(Schema.Type.DOUBLE)),
65-
Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)),
66-
Schema.Field.of("VARCHAR_COL", Schema.of(Schema.Type.STRING)),
67-
Schema.Field.of("CHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
68-
Schema.Field.of("VARCHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
69-
Schema.Field.of("GRAPHIC_COL", Schema.of(Schema.Type.STRING)),
70-
Schema.Field.of("CLOB_COL", Schema.of(Schema.Type.STRING)),
71-
Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)),
72-
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
73-
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
74-
Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)),
75-
Schema.Field.of("BINARY_COL", Schema.of(Schema.Type.BYTES)),
76-
Schema.Field.of("VARBINARY_COL", Schema.of(Schema.Type.BYTES)),
77-
Schema.Field.of("VARGRAPHIC_COL", Schema.of(Schema.Type.STRING)),
78-
Schema.Field.of("DBCLOB_COL", Schema.of(Schema.Type.STRING))
56+
"dbRecord",
57+
Schema.Field.of("SMALLINT_COL", Schema.of(Schema.Type.INT)),
58+
Schema.Field.of("INTEGER_COL", Schema.of(Schema.Type.INT)),
59+
Schema.Field.of("BIGINT_COL", Schema.of(Schema.Type.LONG)),
60+
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(10, 6)),
61+
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(10, 6)),
62+
Schema.Field.of("DECFLOAT_COL", Schema.of(Schema.Type.STRING)),
63+
Schema.Field.of("REAL_COL", Schema.of(Schema.Type.FLOAT)),
64+
Schema.Field.of("DOUBLE_COL", Schema.of(Schema.Type.DOUBLE)),
65+
Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)),
66+
Schema.Field.of("VARCHAR_COL", Schema.of(Schema.Type.STRING)),
67+
Schema.Field.of("CHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
68+
Schema.Field.of("VARCHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
69+
Schema.Field.of("GRAPHIC_COL", Schema.of(Schema.Type.STRING)),
70+
Schema.Field.of("CLOB_COL", Schema.of(Schema.Type.STRING)),
71+
Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)),
72+
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
73+
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
74+
Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)),
75+
Schema.Field.of("BINARY_COL", Schema.of(Schema.Type.BYTES)),
76+
Schema.Field.of("VARBINARY_COL", Schema.of(Schema.Type.BYTES)),
77+
Schema.Field.of("VARGRAPHIC_COL", Schema.of(Schema.Type.STRING)),
78+
Schema.Field.of("DBCLOB_COL", Schema.of(Schema.Type.STRING))
7979
);
8080

8181
@Before
@@ -107,6 +107,16 @@ public void testDBSinkWithDBSchemaAndInvalidData() throws Exception {
107107
}
108108
}
109109

110+
@Test
111+
public void testDBSinkWithExplicitInputSchema() throws Exception {
112+
testDBSink("testDBSinkWithExplicitInputSchema", "input-dbsinktest-explicit", true);
113+
}
114+
115+
@Test
116+
public void testDBSinkWithInferredInputSchema() throws Exception {
117+
testDBSink("testDBSinkWithInferredInputSchema", "input-dbsinktest-inferred", false);
118+
}
119+
110120
public void testDBSink(String appName, String inputDatasetName, boolean setInputSchema) throws Exception {
111121
ETLPlugin sourceConfig = (setInputSchema)
112122
? MockSource.getPlugin(inputDatasetName, SCHEMA)
@@ -199,7 +209,7 @@ private void createInputData(String inputDatasetName) throws Exception {
199209
.set("BIGINT_COL", (long) i)
200210
.setDecimal("NUMERIC_COL", new BigDecimal(3.458d, new MathContext(PRECISION)).setScale(SCALE))
201211
.setDecimal("DECIMAL_COL", new BigDecimal(3.459d, new MathContext(PRECISION)).setScale(SCALE))
202-
.set("DECFLOAT_COL", .42 + i)
212+
.set("DECFLOAT_COL", Double.toString(.42 + i))
203213
.set("REAL_COL", 24.123f + i)
204214
.set("DOUBLE_COL", 3.456)
205215
.set("CHAR_COL", name)

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2SourceTestRun.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ public void testDBMacroSupport() throws Exception {
8686
public void testDBSource() throws Exception {
8787
String importQuery = "SELECT SMALLINT_COL, INTEGER_COL, BIGINT_COL, DECIMAL_COL, NUMERIC_COL, " +
8888
" REAL_COL, DOUBLE_COL, CHAR_COL, DECFLOAT_COL, VARCHAR_COL, CHAR_BIT_COL, VARCHAR_BIT_COL, GRAPHIC_COL, " +
89-
" CLOB_COL, BLOB_COL, DATE_COL, TIME_COL, TIMESTAMP_COL, BINARY_COL, VARBINARY_COL, VARGRAPHIC_COL, DBCLOB_COL " +
90-
"FROM my_table WHERE SMALLINT_COL < 3 AND $CONDITIONS";
89+
" CLOB_COL, BLOB_COL, DATE_COL, TIME_COL, TIMESTAMP_COL, BINARY_COL, VARBINARY_COL, VARGRAPHIC_COL, DBCLOB_COL" +
90+
" FROM my_table WHERE SMALLINT_COL < 3 AND $CONDITIONS";
9191

9292

9393
String boundingQuery = "SELECT MIN(SMALLINT_COL),MAX(SMALLINT_COL) from my_table";
@@ -135,8 +135,8 @@ public void testDBSource() throws Exception {
135135
row1.getDecimal("NUMERIC_COL"));
136136
Assert.assertEquals(new BigDecimal(5.14, new MathContext(PRECISION)).setScale(SCALE),
137137
row2.getDecimal("NUMERIC_COL"));
138-
Assert.assertEquals(4.14, row1.get("DECFLOAT_COL"), 0.00001);
139-
Assert.assertEquals(5.14, row2.get("DECFLOAT_COL"), 0.00001);
138+
Assert.assertTrue(((String) row1.get("DECFLOAT_COL")).startsWith("4.14"));
139+
Assert.assertTrue(((String) row2.get("DECFLOAT_COL")).startsWith("5.14"));
140140
Assert.assertEquals(4.14f, row1.get("REAL_COL"), 0.00001f);
141141
Assert.assertEquals(5.14f, row2.get("REAL_COL"), 0.00001f);
142142
Assert.assertEquals(4.14, row1.get("DOUBLE_COL"), 0.00001);

0 commit comments

Comments
 (0)