Skip to content

Commit 076cdaa

Browse files
authored
Merge pull request #45 from vladhlinskiy/feature/CDAP-15549-mssql-all-data-types
CDAP-15549 MS SQL db plugin enhancements: all data types support and proper test coverage
2 parents 973a744 + 23e3e79 commit 076cdaa

File tree

20 files changed

+855
-209
lines changed

20 files changed

+855
-209
lines changed

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
3737
ResultSetMetaData metadata = resultSet.getMetaData();
3838
// ResultSetMetadata columns are numbered starting with 1
3939
for (int i = 1; i <= metadata.getColumnCount(); i++) {
40+
if (shouldIgnoreColumn(metadata, i)) {
41+
continue;
42+
}
4043
String columnName = metadata.getColumnName(i);
4144
Schema columnSchema = getSchema(metadata, i);
4245
if (ResultSetMetaData.columnNullable == metadata.isNullable(i)) {
@@ -120,4 +123,9 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
120123

121124
return Schema.of(type);
122125
}
126+
127+
@Override
128+
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
129+
return false;
130+
}
123131
}

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ private void writeToDataOut(DataOutput out, Schema.Field field) throws IOExcepti
264264
}
265265
}
266266

267-
private void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldIndex) throws SQLException {
267+
protected void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldIndex) throws SQLException {
268268
String fieldName = field.getName();
269269
Schema fieldSchema = getNonNullableSchema(field);
270270
Schema.Type fieldType = fieldSchema.getType();
@@ -330,7 +330,8 @@ private void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldInde
330330
}
331331
}
332332

333-
private void writeBytes(PreparedStatement stmt, int fieldIndex, int sqlIndex, Object fieldValue) throws SQLException {
333+
protected void writeBytes(PreparedStatement stmt, int fieldIndex, int sqlIndex, Object fieldValue)
334+
throws SQLException {
334335
byte[] byteValue = fieldValue instanceof ByteBuffer ? Bytes.toBytes((ByteBuffer) fieldValue) : (byte[]) fieldValue;
335336
int parameterType = columnTypes[fieldIndex];
336337
if (Types.BLOB == parameterType) {

database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.sql.ResultSetMetaData;
2323
import java.sql.SQLException;
2424
import java.util.List;
25-
import javax.annotation.Nullable;
2625

2726
/**
2827
* Main Interface to read db specific types.
@@ -49,4 +48,20 @@ public interface SchemaReader {
4948
* @throws SQLException
5049
*/
5150
Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException;
51+
52+
/**
53+
* Given a sql metadata indicates if sql column must be ignored and not included in the output schema. Thus we can
54+
* support different data types for Sink and Source plugins.
55+
* <p/>
56+
* For example, in MS SQL 'TIMESTAMP' is the synonym for the 'ROWVERSION' data type, values of which are
57+
* automatically generated and can not be inserted or updated. Therefore 'TIMESTAMP' can not be supported by Sink
58+
* plugin. 'TIMESTAMP' reported as non-nullable column by JDBC connector, so inferred schema will be also
59+
* non-nullable for this field, requiring to set a value. By ignoring this column we will avoid schema validation
60+
* failure.
61+
* @param metadata resultSet metadata
62+
* @param index sql column index
63+
* @return 'true' if sql column must not included in the output schema, 'false' otherwise
64+
* @throws SQLException
65+
*/
66+
boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException;
5267
}

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -322,46 +322,49 @@ private void validateFields(Schema inputSchema, ResultSet rs) throws SQLExceptio
322322

323323
Preconditions.checkNotNull(inputSchema.getFields());
324324
Set<String> invalidFields = new HashSet<>();
325-
326325
for (Schema.Field field : inputSchema.getFields()) {
327326
int columnIndex = rs.findColumn(field.getName());
328-
int type = rsMetaData.getColumnType(columnIndex);
329-
int precision = rsMetaData.getPrecision(columnIndex);
330-
int scale = rsMetaData.getScale(columnIndex);
331-
332-
Schema columnSchema = DBUtils.getSchema(type, precision, scale);
333-
boolean isColumnNullable = (ResultSetMetaData.columnNullable == rsMetaData.isNullable(columnIndex));
334-
335-
Schema.Type columnType = columnSchema.getType();
336-
Schema.LogicalType columnLogicalType = columnSchema.getLogicalType();
337-
338-
Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
339-
Schema.Type fieldType = fieldSchema.getType();
340-
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
341-
342-
boolean isNotNullAssignable = !isColumnNullable && field.getSchema().isNullable();
343-
boolean isNotCompatible = !(Objects.equals(fieldType, columnType)
344-
&& Objects.equals(fieldLogicalType, columnLogicalType));
345-
346-
if (isNotCompatible) {
347-
invalidFields.add(field.getName());
348-
LOG.error("Field {} was given as type {} but the database column is actually of type {}.",
349-
field.getName(),
350-
fieldLogicalType != null ? fieldLogicalType.getToken() : fieldType,
351-
columnLogicalType != null ? columnLogicalType.getToken() : columnType
352-
);
353-
}
354-
if (isNotNullAssignable) {
327+
if (!isFieldCompatible(field, rsMetaData, columnIndex)) {
355328
invalidFields.add(field.getName());
356-
LOG.error("Field {} was given as nullable but the database column is not nullable", field.getName());
357329
}
358330
}
359331

360332
Preconditions.checkArgument(invalidFields.isEmpty(),
361-
"Couldn't find matching database column(s) for input field(s) %s.",
333+
"Couldn't find matching database column(s) for input field(s) '%s'.",
362334
String.join(",", invalidFields));
363335
}
364336

337+
/**
338+
* Checks if field of the input schema is compatible with corresponding database column.
339+
* @param field field of the explicit input schema.
340+
* @param metadata resultSet metadata.
341+
* @param index sql column index.
342+
* @return 'true' if field is compatible, 'false' otherwise.
343+
*/
344+
protected boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException {
345+
boolean isColumnNullable = (ResultSetMetaData.columnNullable == metadata.isNullable(index));
346+
boolean isNotNullAssignable = !isColumnNullable && field.getSchema().isNullable();
347+
if (isNotNullAssignable) {
348+
LOG.error("Field '{}' was given as nullable but the database column is not nullable", field.getName());
349+
return false;
350+
}
351+
352+
int type = metadata.getColumnType(index);
353+
int precision = metadata.getPrecision(index);
354+
int scale = metadata.getScale(index);
355+
356+
Schema inputFieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
357+
Schema outputFieldSchema = DBUtils.getSchema(type, precision, scale);
358+
if (!Objects.equals(inputFieldSchema.getType(), outputFieldSchema.getType()) ||
359+
!Objects.equals(inputFieldSchema.getLogicalType(), outputFieldSchema.getLogicalType())) {
360+
LOG.error("Field '{}' was given as type '{}' but the database column is actually of type '{}'.",
361+
field.getName(), inputFieldSchema.getType(), outputFieldSchema.getType());
362+
return false;
363+
}
364+
365+
return true;
366+
}
367+
365368
private void emitLineage(BatchSinkContext context, List<Schema.Field> fields) {
366369
LineageRecorder lineageRecorder = new LineageRecorder(context, dbSinkConfig.referenceName);
367370

database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.lang.reflect.Field;
3535
import java.lang.reflect.Method;
3636
import java.math.BigDecimal;
37-
import java.math.MathContext;
3837
import java.sql.Blob;
3938
import java.sql.Clob;
4039
import java.sql.Driver;
@@ -258,8 +257,7 @@ public static Object transformValue(int sqlType, int precision, int scale,
258257
return ((Number) original).intValue();
259258
case Types.NUMERIC:
260259
case Types.DECIMAL:
261-
BigDecimal decimal = (BigDecimal) original;
262-
return new BigDecimal(decimal.unscaledValue(), scale, new MathContext(precision));
260+
return (BigDecimal) original;
263261
case Types.DATE:
264262
return resultSet.getDate(columnIndex);
265263
case Types.TIME:
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db;
18+
19+
import org.junit.Assert;
20+
21+
/**
22+
* Test util methods for custom assertions.
23+
*/
24+
public final class CustomAssertions {
25+
26+
/**
27+
* The maximum delta between expected and actual floating point number for which both numbers are still considered
28+
* equal.
29+
*/
30+
public static final double DELTA = 0.000001;
31+
32+
private CustomAssertions() {
33+
throw new AssertionError("Should not instantiate static utility class.");
34+
}
35+
36+
/**
37+
* Reuses {@link Assert#assertEquals(Object, Object)}. Added to prevent 'Ambiguous method call' issue.
38+
* Asserts that two objects are equal. If they are not, an {@link AssertionError} without a message is thrown.
39+
* If expected and actual are null, they are considered equal.
40+
*
41+
* @param expected expected value
42+
* @param actual the value to check against expected
43+
*/
44+
public static void assertObjectEquals(Object expected, Object actual) {
45+
Assert.assertEquals(expected, actual);
46+
}
47+
48+
/**
49+
* Reuses {@link Assert#assertEquals(double, double, double)} with default {@link CustomAssertions#DELTA}.
50+
* Added to prevent repetitive casts to 'double' and specifying delta. Asserts that two doubles are equal to within
51+
* the delta. If they are not, an AssertionError is thrown with the given message.
52+
*
53+
* @param expected expected value
54+
* @param actual the value to check against expected
55+
*/
56+
public static void assertNumericEquals(double expected, double actual) {
57+
Assert.assertEquals(expected, actual, 0.01);
58+
}
59+
60+
/**
61+
* Reuses {@link Assert#assertEquals(double, double, double)} with default {@link CustomAssertions#DELTA}.
62+
* Added to prevent repetitive casts to 'float' and specifying delta. Asserts that two doubles are equal to within
63+
* the delta. If they are not, an AssertionError is thrown with the given message.
64+
*
65+
* @param expected expected value
66+
* @param actual the value to check against expected
67+
*/
68+
public static void assertNumericEquals(float expected, float actual) {
69+
Assert.assertEquals(expected, actual, 0.01);
70+
}
71+
}

mssql-plugin/docs/SqlServer-batchsink.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,58 @@ https://docs.microsoft.com/en-us/sql/relational-databases/system-compatibility-v
7575
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
7676
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
7777

78+
Data Types Mapping
79+
----------
80+
81+
82+
| MS SQL Data Type | CDAP Schema Data Type | Comment |
83+
| ---------------- | ---------------------- | -------------------------------------------------------------- |
84+
| BIGINT | long | |
85+
| BINARY | bytes | |
86+
| BIT | boolean | |
87+
| CHAR | string | |
88+
| DATE | date | |
89+
| DATETIME | timestamp | |
90+
| DATETIME2 | timestamp | |
91+
| DATETIMEOFFSET | string | |
92+
| DECIMAL | decimal | |
93+
| FLOAT | double | |
94+
| IMAGE | bytes | |
95+
| INT | int | |
96+
| MONEY | decimal | |
97+
| NCHAR | string | |
98+
| NTEXT | string | |
99+
| NUMERIC | decimal | |
100+
| NVARCHAR | string | |
101+
| NVARCHAR(MAX) | string | |
102+
| REAL | float | |
103+
| SMALLDATETIME | timestamp | |
104+
| SMALLINT | int | |
105+
| SMALLMONEY | decimal | |
106+
| TEXT | string | |
107+
| TIME | time | TIME data type has the accuracy of 100 nanoseconds which is |
108+
| | | not currently supported. Values of this type will be rounded |
109+
| | | to microsecond. |
110+
| TINYINT | int | |
111+
| UDT | bytes | UDT types are mapped according to the type they are an alias |
112+
| | | of. For example, is there is an 'SSN' type that was created as |
113+
| | | 'CREATE TYPE SSN FROM varchar(11);', that type would get |
114+
| | | mapped to a CDAP string. Common Language Runtime UDTs are |
115+
| | | mapped to CDAP bytes. |
116+
| UNIQUEIDENTIFIER | string | |
117+
| VARBINARY | bytes | |
118+
| VARBINARY(MAX) | bytes | |
119+
| VARCHAR | string | |
120+
| VARCHAR(MAX) | string | |
121+
| XML | string | |
122+
| SQLVARIANT | string | |
123+
| GEOMETRY | bytes | |
124+
| GEOGRAPHY | bytes | |
125+
| GEOMETRY | string | Values of this type can be set from Well Known Text strings, |
126+
| | | such as "POINT(3 40 5 6)". |
127+
| GEOGRAPHY | string | Values of this type can be set from Well Known Text strings, |
128+
| | | such as "POINT(3 40 5 6)". |
129+
78130

79131
Example
80132
-------

0 commit comments

Comments
 (0)