Skip to content

Commit eff7e9e

Browse files
authored
Merge pull request #49 from vladhlinskiy/feature/CDAP-15551-db2-all-data-types-tests
CDAP-15551 DB2 db plugin enhancements: all data types support and proper test coverage
2 parents 076cdaa + a72dd1c commit eff7e9e

File tree

9 files changed

+161
-44
lines changed

9 files changed

+161
-44
lines changed

database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
156156
try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
157157
executeInitQueries(connection, sourceConfig.getInitQueries());
158158
return loadSchemaFromDB(connection, sourceConfig.importQuery);
159+
160+
} catch (SQLException e) {
161+
// wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath
162+
throw new SQLException(e.getMessage(), e.getSQLState(), e.getErrorCode());
159163
} finally {
160164
driverCleanup.destroy();
161165
}

db2-plugin/docs/Db2-batchsource.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,29 @@ non-nullable varchars, output records will have this schema:
7878
| name | string |
7979
| email | string |
8080
| phone | string |
81+
82+
Data Types Mapping
83+
------
84+
| DB2 Data Type | CDAP Schema Data Type | Comment |
85+
|------------------------------|-----------------------|---------|
86+
| SMALLINT | int | |
87+
| INTEGER | int | |
88+
| BIGINT | long | |
89+
| DECIMAL(p,s) or NUMERIC(p,s) | decimal | |
90+
| DECFLOAT | string | |
91+
| REAL | float | |
92+
| DOUBLE | double | |
93+
| CHAR | string | |
94+
| VARCHAR | string | |
95+
| CHAR(n) FOR BIT DATA | bytes | |
96+
| VARCHAR(n) FOR BIT DATA | bytes | |
97+
| BINARY | bytes | |
98+
| VARBINARY | bytes | |
99+
| GRAPHIC | string | |
100+
| VARGRAPHIC | string | |
101+
| CLOB | string | |
102+
| BLOB | bytes | |
103+
| DBCLOB | string | |
104+
| DATE | date | |
105+
| TIME | time | |
106+
| TIMESTAMP | timestamp | |

db2-plugin/src/main/java/io/cdap/plugin/db2/DB2Record.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private void handleSpecificType(ResultSet resultSet,
6565
String columnTypeName = metaData.getColumnTypeName(columnIndex);
6666

6767
if (DB2SchemaReader.DB2_DECFLOAT.equals(columnTypeName)) {
68-
recordBuilder.set(field.getName(), resultSet.getBigDecimal(columnIndex).doubleValue());
68+
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
6969
}
7070
}
7171
}

db2-plugin/src/main/java/io/cdap/plugin/db2/DB2SchemaReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DB2SchemaReader extends CommonSchemaReader {
3737
@Override
3838
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
3939
if (DB2_TYPES.contains(metadata.getColumnType(index)) && DB2_DECFLOAT.equals(metadata.getColumnTypeName(index))) {
40-
return Schema.of(Schema.Type.DOUBLE);
40+
return Schema.of(Schema.Type.STRING);
4141
} else {
4242
return super.getSchema(metadata, index);
4343
}

db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Sink.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,18 @@
2020
import io.cdap.cdap.api.annotation.Name;
2121
import io.cdap.cdap.api.annotation.Plugin;
2222
import io.cdap.cdap.api.data.format.StructuredRecord;
23+
import io.cdap.cdap.api.data.schema.Schema;
2324
import io.cdap.cdap.etl.api.batch.BatchSink;
2425
import io.cdap.plugin.db.DBRecord;
2526
import io.cdap.plugin.db.SchemaReader;
2627
import io.cdap.plugin.db.batch.config.DBSpecificSinkConfig;
2728
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.sql.ResultSetMetaData;
33+
import java.sql.SQLException;
34+
import java.util.Objects;
2835

2936

3037
/**
@@ -34,6 +41,7 @@
3441
@Name(Db2Constants.PLUGIN_NAME)
3542
@Description("Writes records to a DB2 table. Each record will be written in a row in the table.")
3643
public class Db2Sink extends AbstractDBSink {
44+
private static final Logger LOG = LoggerFactory.getLogger(Db2Sink.class);
3745

3846
private final Db2SinkConfig db2SinkConfig;
3947

@@ -61,4 +69,24 @@ protected DBRecord getDBRecord(StructuredRecord.Builder output) {
6169
protected SchemaReader getSchemaReader() {
6270
return new DB2SchemaReader();
6371
}
72+
73+
@Override
74+
protected boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException {
75+
Schema.Type fieldType = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType()
76+
: field.getSchema().getType();
77+
78+
//DECFLOAT is mapped to string
79+
String colTypeName = metadata.getColumnTypeName(index);
80+
if (DB2SchemaReader.DB2_DECFLOAT.equals(colTypeName)) {
81+
if (Objects.equals(fieldType, Schema.Type.STRING)) {
82+
return true;
83+
} else {
84+
LOG.error("Field '{}' was given as type '{}' but must be of type 'string' for the DB2 column of " +
85+
"DECFLOAT type.", field.getName(), fieldType);
86+
return false;
87+
}
88+
}
89+
90+
return super.isFieldCompatible(field, metadata, index);
91+
}
6492
}

db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.cdap.cdap.api.annotation.Name;
2121
import io.cdap.cdap.api.annotation.Plugin;
2222
import io.cdap.cdap.etl.api.batch.BatchSource;
23+
import io.cdap.plugin.db.SchemaReader;
2324
import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig;
2425
import io.cdap.plugin.db.batch.source.AbstractDBSource;
2526
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -41,6 +42,11 @@ public Db2Source(Db2SourceConfig db2SourceConfig) {
4142
this.db2SourceConfig = db2SourceConfig;
4243
}
4344

45+
@Override
46+
protected SchemaReader getSchemaReader() {
47+
return new DB2SchemaReader();
48+
}
49+
4450
@Override
4551
protected Class<? extends DBWritable> getDBRecordType() {
4652
return DB2Record.class;

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2PluginTestBase.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,11 @@ protected static void createTestTables(Connection conn) throws SQLException {
144144
" BLOB_COL BLOB," +
145145
" DATE_COL DATE," +
146146
" TIME_COL TIME," +
147-
" TIMESTAMP_COL TIMESTAMP" +
147+
" TIMESTAMP_COL TIMESTAMP," +
148+
" BINARY_COL BINARY(10)," +
149+
" VARBINARY_COL VARBINARY(10)," +
150+
" VARGRAPHIC_COL VARGRAPHIC(10)," +
151+
" DBCLOB_COL DBCLOB(10)" +
148152
")");
149153
stmt.execute("CREATE TABLE MY_DEST_TABLE LIKE my_table");
150154
stmt.execute("CREATE TABLE your_table LIKE my_table");
@@ -157,11 +161,11 @@ protected static void prepareTestData(Connection conn) throws SQLException {
157161
PreparedStatement pStmt1 =
158162
conn.prepareStatement("INSERT INTO my_table " +
159163
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
160-
" ?, ?, ?, ?, ?, ?, ?, ?)");
164+
" ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
161165
PreparedStatement pStmt2 =
162166
conn.prepareStatement("INSERT INTO your_table " +
163167
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
164-
" ?, ?, ?, ?, ?, ?, ?, ?)")) {
168+
" ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
165169

166170
stmt.execute("insert into dbActionTest values (1, '1970-01-01')");
167171
stmt.execute("insert into postActionTest values (1, '1970-01-01')");
@@ -193,6 +197,10 @@ private static void populateData(PreparedStatement ...stmts) throws SQLException
193197
pStmt.setDate(16, new Date(CURRENT_TS));
194198
pStmt.setTime(17, new Time(CURRENT_TS));
195199
pStmt.setTimestamp(18, new Timestamp(CURRENT_TS));
200+
pStmt.setBytes(19, name.getBytes());
201+
pStmt.setBytes(20, name.getBytes());
202+
pStmt.setString(21, name);
203+
pStmt.setString(22, name);
196204

197205
pStmt.executeUpdate();
198206
}

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2SinkTestRun.java

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.cdap.cdap.etl.api.batch.BatchSink;
2626
import io.cdap.cdap.etl.mock.batch.MockSource;
2727
import io.cdap.cdap.etl.proto.v2.ETLPlugin;
28+
import io.cdap.cdap.test.ApplicationManager;
2829
import io.cdap.cdap.test.DataSetManager;
2930
import io.cdap.plugin.common.Constants;
3031
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
@@ -51,6 +52,31 @@
5152
* Test for ETL using databases.
5253
*/
5354
public class Db2SinkTestRun extends Db2PluginTestBase {
55+
private static final Schema SCHEMA = Schema.recordOf(
56+
"dbRecord",
57+
Schema.Field.of("SMALLINT_COL", Schema.of(Schema.Type.INT)),
58+
Schema.Field.of("INTEGER_COL", Schema.of(Schema.Type.INT)),
59+
Schema.Field.of("BIGINT_COL", Schema.of(Schema.Type.LONG)),
60+
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(10, 6)),
61+
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(10, 6)),
62+
Schema.Field.of("DECFLOAT_COL", Schema.of(Schema.Type.STRING)),
63+
Schema.Field.of("REAL_COL", Schema.of(Schema.Type.FLOAT)),
64+
Schema.Field.of("DOUBLE_COL", Schema.of(Schema.Type.DOUBLE)),
65+
Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)),
66+
Schema.Field.of("VARCHAR_COL", Schema.of(Schema.Type.STRING)),
67+
Schema.Field.of("CHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
68+
Schema.Field.of("VARCHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
69+
Schema.Field.of("GRAPHIC_COL", Schema.of(Schema.Type.STRING)),
70+
Schema.Field.of("CLOB_COL", Schema.of(Schema.Type.STRING)),
71+
Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)),
72+
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
73+
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
74+
Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)),
75+
Schema.Field.of("BINARY_COL", Schema.of(Schema.Type.BYTES)),
76+
Schema.Field.of("VARBINARY_COL", Schema.of(Schema.Type.BYTES)),
77+
Schema.Field.of("VARGRAPHIC_COL", Schema.of(Schema.Type.STRING)),
78+
Schema.Field.of("DBCLOB_COL", Schema.of(Schema.Type.STRING))
79+
);
5480

5581
@Before
5682
public void setup() throws Exception {
@@ -82,27 +108,55 @@ public void testDBSinkWithDBSchemaAndInvalidData() throws Exception {
82108
}
83109

84110
@Test
85-
public void testDBSink() throws Exception {
111+
public void testDBSinkWithExplicitInputSchema() throws Exception {
112+
testDBSink("testDBSinkWithExplicitInputSchema", "input-dbsinktest-explicit", true);
113+
}
114+
115+
@Test
116+
public void testDBSinkWithInferredInputSchema() throws Exception {
117+
testDBSink("testDBSinkWithInferredInputSchema", "input-dbsinktest-inferred", false);
118+
}
86119

87-
String inputDatasetName = "input-dbsinktest";
120+
public void testDBSink(String appName, String inputDatasetName, boolean setInputSchema) throws Exception {
121+
ETLPlugin sourceConfig = (setInputSchema)
122+
? MockSource.getPlugin(inputDatasetName, SCHEMA)
123+
: MockSource.getPlugin(inputDatasetName);
88124

89-
ETLPlugin sourceConfig = MockSource.getPlugin(inputDatasetName);
90125
ETLPlugin sinkConfig = getSinkConfig();
91126

92-
deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, "testDBSink");
127+
ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, appName);
93128
createInputData(inputDatasetName);
94-
129+
runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS)));
95130

96131
try (Connection conn = createConnection();
97132
Statement stmt = conn.createStatement();
98-
ResultSet resultSet = stmt.executeQuery("SELECT * FROM my_table")) {
133+
ResultSet resultSet = stmt.executeQuery("SELECT * FROM MY_DEST_TABLE ORDER BY SMALLINT_COL")) {
99134
Set<String> users = new HashSet<>();
135+
100136
Assert.assertTrue(resultSet.next());
101137
users.add(resultSet.getString("VARCHAR_COL"));
138+
Assert.assertEquals(1, resultSet.getInt("SMALLINT_COL"));
139+
Assert.assertEquals(1, resultSet.getInt("INTEGER_COL"));
140+
Assert.assertEquals(1L, resultSet.getLong("BIGINT_COL"));
141+
Assert.assertEquals(3.458, resultSet.getBigDecimal("NUMERIC_COL").doubleValue(), 0.00001);
142+
Assert.assertEquals(3.459, resultSet.getBigDecimal("DECIMAL_COL").doubleValue(), 0.00001);
143+
Assert.assertEquals(1.42, resultSet.getDouble("DECFLOAT_COL"), 0.00001);
144+
Assert.assertEquals(25.123f, resultSet.getFloat("REAL_COL"), 0.00001f);
145+
Assert.assertEquals(3.456, resultSet.getDouble("DOUBLE_COL"), 0.00001);
146+
Assert.assertEquals("user1", resultSet.getString("CHAR_COL").trim());
147+
Assert.assertEquals("user1", resultSet.getString("VARCHAR_COL"));
148+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("CHAR_BIT_COL"), 0, 5));
149+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("VARCHAR_BIT_COL")));
150+
Assert.assertEquals("user1", resultSet.getString("GRAPHIC_COL").trim());
102151
Assert.assertEquals(new Date(CURRENT_TS).toString(), resultSet.getDate("DATE_COL").toString());
103152
Assert.assertEquals(new Time(CURRENT_TS).toString(), resultSet.getTime("TIME_COL").toString());
104153
Assert.assertEquals(new Timestamp(CURRENT_TS), resultSet.getTimestamp("TIMESTAMP_COL"));
154+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("BINARY_COL"), 0, 5));
155+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("VARBINARY_COL")));
156+
Assert.assertEquals("user1", resultSet.getString("VARGRAPHIC_COL").trim());
157+
Assert.assertEquals("user1", resultSet.getString("DBCLOB_COL"));
105158
Assert.assertTrue(resultSet.next());
159+
users.add(resultSet.getString("VARCHAR_COL"));
106160
Assert.assertEquals("user2", Bytes.toString(resultSet.getBytes("BLOB_COL"), 0, 5));
107161
Assert.assertEquals("user2", resultSet.getString("CLOB_COL"));
108162
Assert.assertEquals(new BigDecimal(3.458, new MathContext(PRECISION)).setScale(SCALE),
@@ -111,7 +165,6 @@ public void testDBSink() throws Exception {
111165
resultSet.getBigDecimal("DECIMAL_COL"));
112166
users.add(resultSet.getString("VARCHAR_COL"));
113167
Assert.assertEquals(ImmutableSet.of("user1", "user2"), users);
114-
115168
}
116169
}
117170

@@ -146,50 +199,33 @@ protected void writeDataForInvalidDataWriteTest(String inputDatasetName, String
146199
private void createInputData(String inputDatasetName) throws Exception {
147200
// add some data to the input table
148201
DataSetManager<Table> inputManager = getDataset(inputDatasetName);
149-
Schema schema = Schema.recordOf(
150-
"dbRecord",
151-
Schema.Field.of("SMALLINT_COL", Schema.of(Schema.Type.INT)),
152-
Schema.Field.of("INTEGER_COL", Schema.of(Schema.Type.INT)),
153-
Schema.Field.of("BIGINT_COL", Schema.of(Schema.Type.LONG)),
154-
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(PRECISION, SCALE)),
155-
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(PRECISION, SCALE)),
156-
Schema.Field.of("DECFLOAT_COL", Schema.of(Schema.Type.DOUBLE)),
157-
Schema.Field.of("REAL_COL", Schema.of(Schema.Type.FLOAT)),
158-
Schema.Field.of("DOUBLE_COL", Schema.of(Schema.Type.DOUBLE)),
159-
Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)),
160-
Schema.Field.of("VARCHAR_COL", Schema.of(Schema.Type.STRING)),
161-
Schema.Field.of("CHAR_BIT_COL", Schema.of(Schema.Type.STRING)),
162-
Schema.Field.of("VARCHAR_BIT_COL", Schema.of(Schema.Type.STRING)),
163-
Schema.Field.of("GRAPHIC_COL", Schema.of(Schema.Type.STRING)),
164-
Schema.Field.of("CLOB_COL", Schema.of(Schema.Type.BYTES)),
165-
Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)),
166-
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
167-
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
168-
Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))
169-
);
170202
List<StructuredRecord> inputRecords = new ArrayList<>();
171203
LocalDateTime localDateTime = new Timestamp(CURRENT_TS).toLocalDateTime();
172204
for (int i = 1; i <= 2; i++) {
173205
String name = "user" + i;
174-
inputRecords.add(StructuredRecord.builder(schema)
206+
inputRecords.add(StructuredRecord.builder(SCHEMA)
175207
.set("SMALLINT_COL", i)
176208
.set("INTEGER_COL", i)
177209
.set("BIGINT_COL", (long) i)
178210
.setDecimal("NUMERIC_COL", new BigDecimal(3.458d, new MathContext(PRECISION)).setScale(SCALE))
179211
.setDecimal("DECIMAL_COL", new BigDecimal(3.459d, new MathContext(PRECISION)).setScale(SCALE))
180-
.set("DECFLOAT_COL", .42 + i)
181-
.set("REAL_COL", 24f + i)
212+
.set("DECFLOAT_COL", Double.toString(.42 + i))
213+
.set("REAL_COL", 24.123f + i)
182214
.set("DOUBLE_COL", 3.456)
183215
.set("CHAR_COL", name)
184216
.set("VARCHAR_COL", name)
185-
.set("CHAR_BIT_COL", name)
186-
.set("VARCHAR_BIT_COL", name)
217+
.set("CHAR_BIT_COL", name.getBytes())
218+
.set("VARCHAR_BIT_COL", name.getBytes())
187219
.set("GRAPHIC_COL", name)
188-
.set("CLOB_COL", name.getBytes())
220+
.set("CLOB_COL", name)
189221
.set("BLOB_COL", name.getBytes())
190222
.setDate("DATE_COL", localDateTime.toLocalDate())
191223
.setTime("TIME_COL", localDateTime.toLocalTime())
192224
.setTimestamp("TIMESTAMP_COL", localDateTime.atZone(ZoneId.systemDefault()))
225+
.set("BINARY_COL", name.getBytes())
226+
.set("VARBINARY_COL", name.getBytes())
227+
.set("VARGRAPHIC_COL", name)
228+
.set("DBCLOB_COL", name)
193229
.build());
194230
}
195231
MockSource.writeInput(inputManager, inputRecords);

0 commit comments

Comments
 (0)