Skip to content

Commit 4762c01

Browse files
committed
CDAP-15546 Oracle db plugin enhancements: all data types support and proper test coverage
1 parent a72dd1c commit 4762c01

File tree

31 files changed

+1407
-585
lines changed

31 files changed

+1407
-585
lines changed

aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresDBRecord.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,20 @@
1818

1919
import io.cdap.cdap.api.data.format.StructuredRecord;
2020
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.cdap.plugin.db.ColumnType;
2122
import io.cdap.plugin.db.DBRecord;
2223
import io.cdap.plugin.db.SchemaReader;
2324

2425
import java.sql.ResultSet;
2526
import java.sql.SQLException;
27+
import java.util.List;
2628

2729
/**
2830
* Writable class for Aurora DB PostgreSQL Source/Sink
2931
*/
3032
public class AuroraPostgresDBRecord extends DBRecord {
3133

32-
public AuroraPostgresDBRecord(StructuredRecord record, int[] columnTypes) {
34+
public AuroraPostgresDBRecord(StructuredRecord record, List<ColumnType> columnTypes) {
3335
super(record, columnTypes);
3436
}
3537

aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ public AuroraPostgresSink(AuroraPostgresSinkConfig auroraPostgresSinkConfig) {
5353
protected void setColumnsInfo(List<Schema.Field> fields) {
5454
List<String> columnsList = new ArrayList<>();
5555
StringJoiner columnsJoiner = new StringJoiner(",");
56-
5756
for (Schema.Field field : fields) {
5857
columnsList.add(field.getName());
5958
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db;
18+
19+
/**
20+
* Stores SQL column name and type.
21+
*/
22+
public class ColumnType {
23+
24+
private final String name;
25+
private final int type;
26+
27+
public ColumnType(String name, int type) {
28+
this.name = name;
29+
this.type = type;
30+
}
31+
32+
public String getName() {
33+
return name;
34+
}
35+
36+
public int getType() {
37+
return type;
38+
}
39+
}

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.time.ZoneId;
4545
import java.time.ZoneOffset;
4646
import java.util.List;
47+
import javax.annotation.Nullable;
4748
import javax.sql.rowset.serial.SerialBlob;
4849

4950
/**
@@ -59,18 +60,16 @@ public class DBRecord implements Writable, DBWritable, Configurable {
5960
private final Lazy<Schema> schema = new Lazy<>(this::computeSchema);
6061

6162
/**
62-
* Need to cache {@link ResultSetMetaData} of the record for use during writing to a table.
63-
* This is because we cannot rely on JDBC drivers to properly set metadata in the {@link PreparedStatement}
64-
* passed to the #write method in this class.
63+
* Need to cache column types to set fields of the input record on {@link PreparedStatement} in the right order.
6564
*/
66-
protected int[] columnTypes;
65+
protected List<ColumnType> columnTypes;
6766

6867
/**
6968
* Used to construct a DBRecord from a StructuredRecord in the ETL Pipeline
7069
*
7170
* @param record the {@link StructuredRecord} to construct the {@link DBRecord} from
7271
*/
73-
public DBRecord(StructuredRecord record, int[] columnTypes) {
72+
public DBRecord(StructuredRecord record, List<ColumnType> columnTypes) {
7473
this.record = record;
7574
this.columnTypes = columnTypes;
7675
}
@@ -114,7 +113,7 @@ public void readFields(ResultSet resultSet) throws SQLException {
114113
record = recordBuilder.build();
115114
}
116115

117-
private Schema getSchema() {
116+
protected Schema getSchema() {
118117
return schema.getOrCompute();
119118
}
120119

@@ -202,10 +201,10 @@ public void write(DataOutput out) throws IOException {
202201
* @param stmt the {@link PreparedStatement} to write the {@link StructuredRecord} to
203202
*/
204203
public void write(PreparedStatement stmt) throws SQLException {
205-
Schema recordSchema = record.getSchema();
206-
List<Schema.Field> schemaFields = recordSchema.getFields();
207-
for (int i = 0; i < schemaFields.size(); i++) {
208-
writeToDB(stmt, schemaFields.get(i), i);
204+
for (int i = 0; i < columnTypes.size(); i++) {
205+
ColumnType columnType = columnTypes.get(i);
206+
Schema.Field field = record.getSchema().getField(columnType.getName());
207+
writeToDB(stmt, field, i);
209208
}
210209
}
211210

@@ -264,16 +263,24 @@ private void writeToDataOut(DataOutput out, Schema.Field field) throws IOExcepti
264263
}
265264
}
266265

267-
protected void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldIndex) throws SQLException {
266+
protected void writeToDB(PreparedStatement stmt, @Nullable Schema.Field field, int fieldIndex) throws SQLException {
267+
268+
int sqlIndex = fieldIndex + 1;
269+
int sqlType = columnTypes.get(fieldIndex).getType();
270+
if (field == null) {
271+
// Some of the fields can be absent in the record
272+
stmt.setNull(sqlIndex, sqlType);
273+
return;
274+
}
275+
268276
String fieldName = field.getName();
269277
Schema fieldSchema = getNonNullableSchema(field);
270278
Schema.Type fieldType = fieldSchema.getType();
271279
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
272280
Object fieldValue = record.get(fieldName);
273-
int sqlIndex = fieldIndex + 1;
274281

275282
if (fieldValue == null) {
276-
stmt.setNull(sqlIndex, columnTypes[fieldIndex]);
283+
stmt.setNull(sqlIndex, columnTypes.get(fieldIndex).getType());
277284
return;
278285
}
279286

@@ -299,7 +306,7 @@ protected void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldIn
299306

300307
switch (fieldType) {
301308
case NULL:
302-
stmt.setNull(sqlIndex, columnTypes[fieldIndex]);
309+
stmt.setNull(sqlIndex, columnTypes.get(fieldIndex).getType());
303310
break;
304311
case STRING:
305312
// clob can also be written to as setString
@@ -333,7 +340,7 @@ protected void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldIn
333340
protected void writeBytes(PreparedStatement stmt, int fieldIndex, int sqlIndex, Object fieldValue)
334341
throws SQLException {
335342
byte[] byteValue = fieldValue instanceof ByteBuffer ? Bytes.toBytes((ByteBuffer) fieldValue) : (byte[]) fieldValue;
336-
int parameterType = columnTypes[fieldIndex];
343+
int parameterType = columnTypes.get(fieldIndex).getType();
337344
if (Types.BLOB == parameterType) {
338345
stmt.setBlob(sqlIndex, new SerialBlob(byteValue));
339346
return;
@@ -342,9 +349,9 @@ protected void writeBytes(PreparedStatement stmt, int fieldIndex, int sqlIndex,
342349
stmt.setBytes(sqlIndex, byteValue);
343350
}
344351

345-
private void writeInt(PreparedStatement stmt, int fieldIndex, int sqlIndex, Object fieldValue) throws SQLException {
352+
protected void writeInt(PreparedStatement stmt, int fieldIndex, int sqlIndex, Object fieldValue) throws SQLException {
346353
Integer intValue = (Integer) fieldValue;
347-
int parameterType = columnTypes[fieldIndex];
354+
int parameterType = columnTypes.get(fieldIndex).getType();
348355
if (Types.TINYINT == parameterType || Types.SMALLINT == parameterType) {
349356
stmt.setShort(sqlIndex, intValue.shortValue());
350357
return;

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 86 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.cdap.plugin.common.ReferenceBatchSink;
3939
import io.cdap.plugin.common.ReferencePluginConfig;
4040
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
41+
import io.cdap.plugin.db.ColumnType;
4142
import io.cdap.plugin.db.CommonSchemaReader;
4243
import io.cdap.plugin.db.ConnectionConfig;
4344
import io.cdap.plugin.db.ConnectionConfigAccessor;
@@ -59,6 +60,7 @@
5960
import java.sql.ResultSetMetaData;
6061
import java.sql.SQLException;
6162
import java.sql.Statement;
63+
import java.sql.Types;
6264
import java.util.ArrayList;
6365
import java.util.Collections;
6466
import java.util.HashSet;
@@ -80,10 +82,9 @@ public abstract class AbstractDBSink extends ReferenceBatchSink<StructuredRecord
8082
private final DBSinkConfig dbSinkConfig;
8183
private Class<? extends Driver> driverClass;
8284
private DriverCleanup driverCleanup;
83-
protected int[] columnTypes;
8485
protected List<String> columns;
86+
protected List<ColumnType> columnTypes;
8587
protected String dbColumns;
86-
private Schema outputSchema;
8788

8889
public AbstractDBSink(DBSinkConfig dbSinkConfig) {
8990
super(new ReferencePluginConfig(dbSinkConfig.referenceName));
@@ -116,7 +117,7 @@ public void prepareRun(BatchSinkContext context) {
116117
dbSinkConfig.jdbcPluginName,
117118
connectionString);
118119

119-
outputSchema = context.getInputSchema();
120+
Schema outputSchema = context.getInputSchema();
120121

121122
// Load the plugin class to make sure it is available.
122123
Class<? extends Driver> driverClass = context.loadPluginClass(getJDBCPluginId());
@@ -176,7 +177,7 @@ protected void setColumnsInfo(List<Schema.Field> fields) {
176177
public void initialize(BatchRuntimeContext context) throws Exception {
177178
super.initialize(context);
178179
driverClass = context.loadPluginClass(getJDBCPluginId());
179-
outputSchema = Optional.ofNullable(context.getInputSchema()).orElse(inferSchema(driverClass));
180+
Schema outputSchema = Optional.ofNullable(context.getInputSchema()).orElse(inferSchema(driverClass));
180181

181182
setColumnsInfo(outputSchema.getFields());
182183
setResultSetMetadata();
@@ -209,23 +210,11 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
209210

210211
@Override
211212
public void transform(StructuredRecord input, Emitter<KeyValue<DBRecord, NullWritable>> emitter) {
212-
// Create StructuredRecord that only has the columns in this.columns
213-
List<Schema.Field> outputFields = new ArrayList<>();
214-
for (Schema.Field field : input.getSchema().getFields()) {
215-
Preconditions.checkArgument(columns.contains(field.getName()), "Input field '%s' is not found in columns",
216-
field.getName());
217-
outputFields.add(field);
218-
}
219-
StructuredRecord.Builder output = StructuredRecord.builder(outputSchema);
220-
for (String column : columns) {
221-
output.set(column, input.get(column));
222-
}
223-
224-
emitter.emit(new KeyValue<>(getDBRecord(output), null));
213+
emitter.emit(new KeyValue<>(getDBRecord(input), null));
225214
}
226215

227-
protected DBRecord getDBRecord(StructuredRecord.Builder output) {
228-
return new DBRecord(output.build(), columnTypes);
216+
protected DBRecord getDBRecord(StructuredRecord output) {
217+
return new DBRecord(output, columnTypes);
229218
}
230219

231220
protected SchemaReader getSchemaReader() {
@@ -272,12 +261,12 @@ private void setResultSetMetadata() throws Exception {
272261
}
273262
}
274263

275-
columnTypes = new int[columns.size()];
276-
for (int i = 0; i < columnTypes.length; i++) {
277-
String name = columns.get(i);
278-
Preconditions.checkArgument(columnToType.containsKey(name), "Missing column '%s' in SQL table", name);
279-
columnTypes[i] = columnToType.get(name);
280-
}
264+
this.columnTypes = columns.stream()
265+
.map(name -> {
266+
Preconditions.checkArgument(columnToType.containsKey(name), "Missing column '%s' in SQL table", name);
267+
return new ColumnType(name, columnToType.get(name));
268+
})
269+
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
281270
}
282271

283272
private void validateSchema(Class<? extends Driver> jdbcDriverClass, String tableName, Schema inputSchema) {
@@ -324,7 +313,23 @@ private void validateFields(Schema inputSchema, ResultSet rs) throws SQLExceptio
324313
Set<String> invalidFields = new HashSet<>();
325314
for (Schema.Field field : inputSchema.getFields()) {
326315
int columnIndex = rs.findColumn(field.getName());
316+
boolean isColumnNullable = (ResultSetMetaData.columnNullable == rsMetaData.isNullable(columnIndex));
317+
boolean isNotNullAssignable = !isColumnNullable && field.getSchema().isNullable();
318+
if (isNotNullAssignable) {
319+
LOG.error("Field '{}' was given as nullable but the database column is not nullable", field.getName());
320+
invalidFields.add(field.getName());
321+
}
322+
327323
if (!isFieldCompatible(field, rsMetaData, columnIndex)) {
324+
String sqlTypeName = rsMetaData.getColumnTypeName(columnIndex);
325+
Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
326+
Schema.Type fieldType = fieldSchema.getType();
327+
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
328+
LOG.error("Field '{}' was given as type '{}' but the database column is actually of type '{}'.",
329+
field.getName(),
330+
fieldLogicalType != null ? fieldLogicalType.getToken() : fieldType,
331+
sqlTypeName
332+
);
328333
invalidFields.add(field.getName());
329334
}
330335
}
@@ -335,34 +340,70 @@ private void validateFields(Schema inputSchema, ResultSet rs) throws SQLExceptio
335340
}
336341

337342
/**
338-
* Checks if field of the input schema is compatible with corresponding database column.
343+
* Checks if field is compatible to be written into database column of the given sql index.
339344
* @param field field of the explicit input schema.
340345
* @param metadata resultSet metadata.
341346
* @param index sql column index.
342-
* @return 'true' if field is compatible, 'false' otherwise.
347+
* @return 'true' if field is compatible to be written, 'false' otherwise.
343348
*/
344349
protected boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException {
345-
boolean isColumnNullable = (ResultSetMetaData.columnNullable == metadata.isNullable(index));
346-
boolean isNotNullAssignable = !isColumnNullable && field.getSchema().isNullable();
347-
if (isNotNullAssignable) {
348-
LOG.error("Field '{}' was given as nullable but the database column is not nullable", field.getName());
349-
return false;
350+
Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
351+
Schema.Type fieldType = fieldSchema.getType();
352+
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
353+
354+
int sqlType = metadata.getColumnType(index);
355+
356+
// Handle logical types first
357+
if (fieldLogicalType != null) {
358+
switch (fieldLogicalType) {
359+
case DATE:
360+
return sqlType == Types.DATE;
361+
case TIME_MILLIS:
362+
case TIME_MICROS:
363+
return sqlType == Types.TIME;
364+
case TIMESTAMP_MILLIS:
365+
case TIMESTAMP_MICROS:
366+
return sqlType == Types.TIMESTAMP;
367+
case DECIMAL:
368+
return sqlType == Types.NUMERIC
369+
|| sqlType == Types.DECIMAL;
370+
}
350371
}
351372

352-
int type = metadata.getColumnType(index);
353-
int precision = metadata.getPrecision(index);
354-
int scale = metadata.getScale(index);
355-
356-
Schema inputFieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
357-
Schema outputFieldSchema = DBUtils.getSchema(type, precision, scale);
358-
if (!Objects.equals(inputFieldSchema.getType(), outputFieldSchema.getType()) ||
359-
!Objects.equals(inputFieldSchema.getLogicalType(), outputFieldSchema.getLogicalType())) {
360-
LOG.error("Field '{}' was given as type '{}' but the database column is actually of type '{}'.",
361-
field.getName(), inputFieldSchema.getType(), outputFieldSchema.getType());
362-
return false;
373+
switch (fieldType) {
374+
case NULL:
375+
return true;
376+
case BOOLEAN:
377+
return sqlType == Types.BOOLEAN
378+
|| sqlType == Types.BIT;
379+
case INT:
380+
return sqlType == Types.INTEGER
381+
|| sqlType == Types.SMALLINT
382+
|| sqlType == Types.TINYINT;
383+
case LONG:
384+
return sqlType == Types.BIGINT;
385+
case FLOAT:
386+
return sqlType == Types.REAL
387+
|| sqlType == Types.FLOAT;
388+
case DOUBLE:
389+
return sqlType == Types.DOUBLE;
390+
case BYTES:
391+
return sqlType == Types.BINARY
392+
|| sqlType == Types.VARBINARY
393+
|| sqlType == Types.LONGVARBINARY
394+
|| sqlType == Types.BLOB;
395+
case STRING:
396+
return sqlType == Types.VARCHAR
397+
|| sqlType == Types.CHAR
398+
|| sqlType == Types.CLOB
399+
|| sqlType == Types.LONGNVARCHAR
400+
|| sqlType == Types.LONGVARCHAR
401+
|| sqlType == Types.NCHAR
402+
|| sqlType == Types.NCLOB
403+
|| sqlType == Types.NVARCHAR;
404+
default:
405+
return false;
363406
}
364-
365-
return true;
366407
}
367408

368409
private void emitLineage(BatchSinkContext context, List<Schema.Field> fields) {

0 commit comments

Comments
 (0)