Skip to content

Commit 3008122

Browse files
committed
CDAP-15551 DB2 db plugin enhancements: all data types support + proper test coverage
1 parent 076cdaa commit 3008122

File tree

5 files changed

+93
-40
lines changed

5 files changed

+93
-40
lines changed

database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
156156
try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
157157
executeInitQueries(connection, sourceConfig.getInitQueries());
158158
return loadSchemaFromDB(connection, sourceConfig.importQuery);
159+
160+
} catch (SQLException e) {
161+
LOG.error("SQLException while performing getSchema", e);
162+
throw new SQLException(e.getMessage(), e.getSQLState(), e.getErrorCode());
159163
} finally {
160164
driverCleanup.destroy();
161165
}

db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.cdap.cdap.api.annotation.Name;
2121
import io.cdap.cdap.api.annotation.Plugin;
2222
import io.cdap.cdap.etl.api.batch.BatchSource;
23+
import io.cdap.plugin.db.SchemaReader;
2324
import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig;
2425
import io.cdap.plugin.db.batch.source.AbstractDBSource;
2526
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -41,6 +42,11 @@ public Db2Source(Db2SourceConfig db2SourceConfig) {
4142
this.db2SourceConfig = db2SourceConfig;
4243
}
4344

45+
@Override
46+
protected SchemaReader getSchemaReader() {
47+
return new DB2SchemaReader();
48+
}
49+
4450
@Override
4551
protected Class<? extends DBWritable> getDBRecordType() {
4652
return DB2Record.class;

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2PluginTestBase.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,11 @@ protected static void createTestTables(Connection conn) throws SQLException {
144144
" BLOB_COL BLOB," +
145145
" DATE_COL DATE," +
146146
" TIME_COL TIME," +
147-
" TIMESTAMP_COL TIMESTAMP" +
147+
" TIMESTAMP_COL TIMESTAMP," +
148+
" BINARY_COL BINARY(10)," +
149+
" VARBINARY_COL VARBINARY(10)," +
150+
" VARGRAPHIC_COL VARGRAPHIC(10)," +
151+
" DBCLOB_COL DBCLOB(10)" +
148152
")");
149153
stmt.execute("CREATE TABLE MY_DEST_TABLE LIKE my_table");
150154
stmt.execute("CREATE TABLE your_table LIKE my_table");
@@ -157,11 +161,11 @@ protected static void prepareTestData(Connection conn) throws SQLException {
157161
PreparedStatement pStmt1 =
158162
conn.prepareStatement("INSERT INTO my_table " +
159163
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
160-
" ?, ?, ?, ?, ?, ?, ?, ?)");
164+
" ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
161165
PreparedStatement pStmt2 =
162166
conn.prepareStatement("INSERT INTO your_table " +
163167
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
164-
" ?, ?, ?, ?, ?, ?, ?, ?)")) {
168+
" ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
165169

166170
stmt.execute("insert into dbActionTest values (1, '1970-01-01')");
167171
stmt.execute("insert into postActionTest values (1, '1970-01-01')");
@@ -193,6 +197,10 @@ private static void populateData(PreparedStatement ...stmts) throws SQLException
193197
pStmt.setDate(16, new Date(CURRENT_TS));
194198
pStmt.setTime(17, new Time(CURRENT_TS));
195199
pStmt.setTimestamp(18, new Timestamp(CURRENT_TS));
200+
pStmt.setBytes(19, name.getBytes());
201+
pStmt.setBytes(20, name.getBytes());
202+
pStmt.setString(21, name);
203+
pStmt.setString(22, name);
196204

197205
pStmt.executeUpdate();
198206
}

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2SinkTestRun.java

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.cdap.cdap.etl.api.batch.BatchSink;
2626
import io.cdap.cdap.etl.mock.batch.MockSource;
2727
import io.cdap.cdap.etl.proto.v2.ETLPlugin;
28+
import io.cdap.cdap.test.ApplicationManager;
2829
import io.cdap.cdap.test.DataSetManager;
2930
import io.cdap.plugin.common.Constants;
3031
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
@@ -51,6 +52,31 @@
5152
* Test for ETL using databases.
5253
*/
5354
public class Db2SinkTestRun extends Db2PluginTestBase {
55+
private static final Schema SCHEMA = Schema.recordOf(
56+
"dbRecord",
57+
Schema.Field.of("SMALLINT_COL", Schema.of(Schema.Type.INT)),
58+
Schema.Field.of("INTEGER_COL", Schema.of(Schema.Type.INT)),
59+
Schema.Field.of("BIGINT_COL", Schema.of(Schema.Type.LONG)),
60+
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(10, 6)),
61+
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(10, 6)),
62+
Schema.Field.of("DECFLOAT_COL", Schema.of(Schema.Type.DOUBLE)),
63+
Schema.Field.of("REAL_COL", Schema.of(Schema.Type.FLOAT)),
64+
Schema.Field.of("DOUBLE_COL", Schema.of(Schema.Type.DOUBLE)),
65+
Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)),
66+
Schema.Field.of("VARCHAR_COL", Schema.of(Schema.Type.STRING)),
67+
Schema.Field.of("CHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
68+
Schema.Field.of("VARCHAR_BIT_COL", Schema.of(Schema.Type.BYTES)),
69+
Schema.Field.of("GRAPHIC_COL", Schema.of(Schema.Type.STRING)),
70+
Schema.Field.of("CLOB_COL", Schema.of(Schema.Type.STRING)),
71+
Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)),
72+
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
73+
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
74+
Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)),
75+
Schema.Field.of("BINARY_COL", Schema.of(Schema.Type.BYTES)),
76+
Schema.Field.of("VARBINARY_COL", Schema.of(Schema.Type.BYTES)),
77+
Schema.Field.of("VARGRAPHIC_COL", Schema.of(Schema.Type.STRING)),
78+
Schema.Field.of("DBCLOB_COL", Schema.of(Schema.Type.STRING))
79+
);
5480

5581
@Before
5682
public void setup() throws Exception {
@@ -81,28 +107,46 @@ public void testDBSinkWithDBSchemaAndInvalidData() throws Exception {
81107
}
82108
}
83109

84-
@Test
85-
public void testDBSink() throws Exception {
86-
87-
String inputDatasetName = "input-dbsinktest";
110+
public void testDBSink(String appName, String inputDatasetName, boolean setInputSchema) throws Exception {
111+
ETLPlugin sourceConfig = (setInputSchema)
112+
? MockSource.getPlugin(inputDatasetName, SCHEMA)
113+
: MockSource.getPlugin(inputDatasetName);
88114

89-
ETLPlugin sourceConfig = MockSource.getPlugin(inputDatasetName);
90115
ETLPlugin sinkConfig = getSinkConfig();
91116

92-
deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, "testDBSink");
117+
ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, appName);
93118
createInputData(inputDatasetName);
94-
119+
runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS)));
95120

96121
try (Connection conn = createConnection();
97122
Statement stmt = conn.createStatement();
98-
ResultSet resultSet = stmt.executeQuery("SELECT * FROM my_table")) {
123+
ResultSet resultSet = stmt.executeQuery("SELECT * FROM MY_DEST_TABLE ORDER BY SMALLINT_COL")) {
99124
Set<String> users = new HashSet<>();
125+
100126
Assert.assertTrue(resultSet.next());
101127
users.add(resultSet.getString("VARCHAR_COL"));
128+
Assert.assertEquals(1, resultSet.getInt("SMALLINT_COL"));
129+
Assert.assertEquals(1, resultSet.getInt("INTEGER_COL"));
130+
Assert.assertEquals(1L, resultSet.getLong("BIGINT_COL"));
131+
Assert.assertEquals(3.458, resultSet.getBigDecimal("NUMERIC_COL").doubleValue(), 0.00001);
132+
Assert.assertEquals(3.459, resultSet.getBigDecimal("DECIMAL_COL").doubleValue(), 0.00001);
133+
Assert.assertEquals(1.42, resultSet.getDouble("DECFLOAT_COL"), 0.00001);
134+
Assert.assertEquals(25.123f, resultSet.getFloat("REAL_COL"), 0.00001f);
135+
Assert.assertEquals(3.456, resultSet.getDouble("DOUBLE_COL"), 0.00001);
136+
Assert.assertEquals("user1", resultSet.getString("CHAR_COL").trim());
137+
Assert.assertEquals("user1", resultSet.getString("VARCHAR_COL"));
138+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("CHAR_BIT_COL"), 0, 5));
139+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("VARCHAR_BIT_COL")));
140+
Assert.assertEquals("user1", resultSet.getString("GRAPHIC_COL").trim());
102141
Assert.assertEquals(new Date(CURRENT_TS).toString(), resultSet.getDate("DATE_COL").toString());
103142
Assert.assertEquals(new Time(CURRENT_TS).toString(), resultSet.getTime("TIME_COL").toString());
104143
Assert.assertEquals(new Timestamp(CURRENT_TS), resultSet.getTimestamp("TIMESTAMP_COL"));
144+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("BINARY_COL"), 0, 5));
145+
Assert.assertEquals("user1", Bytes.toString(resultSet.getBytes("VARBINARY_COL")));
146+
Assert.assertEquals("user1", resultSet.getString("VARGRAPHIC_COL").trim());
147+
Assert.assertEquals("user1", resultSet.getString("DBCLOB_COL"));
105148
Assert.assertTrue(resultSet.next());
149+
users.add(resultSet.getString("VARCHAR_COL"));
106150
Assert.assertEquals("user2", Bytes.toString(resultSet.getBytes("BLOB_COL"), 0, 5));
107151
Assert.assertEquals("user2", resultSet.getString("CLOB_COL"));
108152
Assert.assertEquals(new BigDecimal(3.458, new MathContext(PRECISION)).setScale(SCALE),
@@ -111,7 +155,6 @@ public void testDBSink() throws Exception {
111155
resultSet.getBigDecimal("DECIMAL_COL"));
112156
users.add(resultSet.getString("VARCHAR_COL"));
113157
Assert.assertEquals(ImmutableSet.of("user1", "user2"), users);
114-
115158
}
116159
}
117160

@@ -146,50 +189,33 @@ protected void writeDataForInvalidDataWriteTest(String inputDatasetName, String
146189
private void createInputData(String inputDatasetName) throws Exception {
147190
// add some data to the input table
148191
DataSetManager<Table> inputManager = getDataset(inputDatasetName);
149-
Schema schema = Schema.recordOf(
150-
"dbRecord",
151-
Schema.Field.of("SMALLINT_COL", Schema.of(Schema.Type.INT)),
152-
Schema.Field.of("INTEGER_COL", Schema.of(Schema.Type.INT)),
153-
Schema.Field.of("BIGINT_COL", Schema.of(Schema.Type.LONG)),
154-
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(PRECISION, SCALE)),
155-
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(PRECISION, SCALE)),
156-
Schema.Field.of("DECFLOAT_COL", Schema.of(Schema.Type.DOUBLE)),
157-
Schema.Field.of("REAL_COL", Schema.of(Schema.Type.FLOAT)),
158-
Schema.Field.of("DOUBLE_COL", Schema.of(Schema.Type.DOUBLE)),
159-
Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)),
160-
Schema.Field.of("VARCHAR_COL", Schema.of(Schema.Type.STRING)),
161-
Schema.Field.of("CHAR_BIT_COL", Schema.of(Schema.Type.STRING)),
162-
Schema.Field.of("VARCHAR_BIT_COL", Schema.of(Schema.Type.STRING)),
163-
Schema.Field.of("GRAPHIC_COL", Schema.of(Schema.Type.STRING)),
164-
Schema.Field.of("CLOB_COL", Schema.of(Schema.Type.BYTES)),
165-
Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)),
166-
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
167-
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
168-
Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))
169-
);
170192
List<StructuredRecord> inputRecords = new ArrayList<>();
171193
LocalDateTime localDateTime = new Timestamp(CURRENT_TS).toLocalDateTime();
172194
for (int i = 1; i <= 2; i++) {
173195
String name = "user" + i;
174-
inputRecords.add(StructuredRecord.builder(schema)
196+
inputRecords.add(StructuredRecord.builder(SCHEMA)
175197
.set("SMALLINT_COL", i)
176198
.set("INTEGER_COL", i)
177199
.set("BIGINT_COL", (long) i)
178200
.setDecimal("NUMERIC_COL", new BigDecimal(3.458d, new MathContext(PRECISION)).setScale(SCALE))
179201
.setDecimal("DECIMAL_COL", new BigDecimal(3.459d, new MathContext(PRECISION)).setScale(SCALE))
180202
.set("DECFLOAT_COL", .42 + i)
181-
.set("REAL_COL", 24f + i)
203+
.set("REAL_COL", 24.123f + i)
182204
.set("DOUBLE_COL", 3.456)
183205
.set("CHAR_COL", name)
184206
.set("VARCHAR_COL", name)
185-
.set("CHAR_BIT_COL", name)
186-
.set("VARCHAR_BIT_COL", name)
207+
.set("CHAR_BIT_COL", name.getBytes())
208+
.set("VARCHAR_BIT_COL", name.getBytes())
187209
.set("GRAPHIC_COL", name)
188-
.set("CLOB_COL", name.getBytes())
210+
.set("CLOB_COL", name)
189211
.set("BLOB_COL", name.getBytes())
190212
.setDate("DATE_COL", localDateTime.toLocalDate())
191213
.setTime("TIME_COL", localDateTime.toLocalTime())
192214
.setTimestamp("TIMESTAMP_COL", localDateTime.atZone(ZoneId.systemDefault()))
215+
.set("BINARY_COL", name.getBytes())
216+
.set("VARBINARY_COL", name.getBytes())
217+
.set("VARGRAPHIC_COL", name)
218+
.set("DBCLOB_COL", name)
193219
.build());
194220
}
195221
MockSource.writeInput(inputManager, inputRecords);

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2SourceTestRun.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,10 @@ public void testDBMacroSupport() throws Exception {
8686
public void testDBSource() throws Exception {
8787
String importQuery = "SELECT SMALLINT_COL, INTEGER_COL, BIGINT_COL, DECIMAL_COL, NUMERIC_COL, " +
8888
" REAL_COL, DOUBLE_COL, CHAR_COL, DECFLOAT_COL, VARCHAR_COL, CHAR_BIT_COL, VARCHAR_BIT_COL, GRAPHIC_COL, " +
89-
" CLOB_COL, BLOB_COL, DATE_COL, TIME_COL, TIMESTAMP_COL FROM my_table " +
89+
" CLOB_COL, BLOB_COL, DATE_COL, TIME_COL, TIMESTAMP_COL, BINARY_COL, VARBINARY_COL, VARGRAPHIC_COL, DBCLOB_COL FROM my_table " +
9090
" WHERE SMALLINT_COL < 3 AND $CONDITIONS";
91+
92+
9193
String boundingQuery = "SELECT MIN(SMALLINT_COL),MAX(SMALLINT_COL) from my_table";
9294
String splitBy = "SMALLINT_COL";
9395
ETLPlugin sourceConfig = new ETLPlugin(
@@ -149,7 +151,10 @@ public void testDBSource() throws Exception {
149151
Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("VARCHAR_BIT_COL")).array(), 0, 5));
150152
Assert.assertEquals("user1", row1.get("GRAPHIC_COL").toString().trim());
151153
Assert.assertEquals("user2", row2.get("GRAPHIC_COL").toString().trim());
152-
154+
Assert.assertEquals("user1", row1.get("VARGRAPHIC_COL").toString().trim());
155+
Assert.assertEquals("user2", row2.get("VARGRAPHIC_COL").toString().trim());
156+
Assert.assertEquals("user1", row1.get("DBCLOB_COL").toString().trim());
157+
Assert.assertEquals("user2", row2.get("DBCLOB_COL").toString().trim());
153158
// Verify time columns
154159
java.util.Date date = new java.util.Date(CURRENT_TS);
155160
LocalDate expectedDate = date.toInstant()
@@ -166,6 +171,10 @@ public void testDBSource() throws Exception {
166171
Assert.assertEquals("user2", row2.get("CLOB_COL"));
167172
Assert.assertEquals("user1", Bytes.toString((ByteBuffer) row1.get("BLOB_COL")));
168173
Assert.assertEquals("user2", Bytes.toString((ByteBuffer) row2.get("BLOB_COL")));
174+
Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("BINARY_COL")).array(), 0, 5));
175+
Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("BINARY_COL")).array(), 0, 5));
176+
Assert.assertEquals("user1", Bytes.toString((ByteBuffer) row1.get("VARBINARY_COL")));
177+
Assert.assertEquals("user2", Bytes.toString((ByteBuffer) row2.get("VARBINARY_COL")));
169178
}
170179

171180
@Test

0 commit comments

Comments
 (0)