Skip to content

Commit 4f48075

Browse files
authored
Merge pull request #43 from vladhlinskiy/feature/CDAP-15519-decimal-logical-type-support
CDAP-15519 Mapping changed to use decimal logical type
2 parents 5f21181 + 8fca100 commit 4f48075

File tree

30 files changed

+257
-138
lines changed

30 files changed

+257
-138
lines changed

aurora-mysql-plugin/src/test/java/io/cdap/plugin/aurora/mysql/AuroraMysqlPluginTestBase.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.junit.BeforeClass;
4040
import org.junit.ClassRule;
4141

42+
import java.math.BigDecimal;
4243
import java.sql.Connection;
4344
import java.sql.Date;
4445
import java.sql.DriverManager;
@@ -63,6 +64,8 @@ public class AuroraMysqlPluginTestBase extends DatabasePluginTestBase {
6364

6465
protected static String connectionUrl;
6566
protected static int year;
67+
protected static final int PRECISION = 10;
68+
protected static final int SCALE = 6;
6669
protected static boolean tearDown = true;
6770
private static int startCount;
6871

@@ -135,8 +138,8 @@ protected static void createTestTables(Connection conn) throws SQLException {
135138
"BIG BIGINT, " +
136139
"FLOAT_COL FLOAT, " +
137140
"REAL_COL REAL, " +
138-
"NUMERIC_COL NUMERIC(10, 2), " +
139-
"DECIMAL_COL DECIMAL(10, 2), " +
141+
"NUMERIC_COL NUMERIC(" + PRECISION + "," + SCALE + "), " +
142+
"DECIMAL_COL DECIMAL(" + PRECISION + "," + SCALE + "), " +
140143
"BIT_COL BIT, " +
141144
"DATE_COL DATE, " +
142145
"TIME_COL TIME, " +
@@ -198,11 +201,11 @@ private static void populateData(PreparedStatement ...stmts) throws SQLException
198201
pStmt.setLong(9, (long) i);
199202
pStmt.setFloat(10, (float) 123.45 + i);
200203
pStmt.setFloat(11, (float) 123.45 + i);
201-
pStmt.setDouble(12, 123.45 + i);
204+
pStmt.setBigDecimal(12, new BigDecimal(123.45).add(new BigDecimal(i)));
202205
if ((i % 2 == 0)) {
203-
pStmt.setNull(13, Types.DOUBLE);
206+
pStmt.setNull(13, Types.DECIMAL);
204207
} else {
205-
pStmt.setDouble(13, 123.45 + i);
208+
pStmt.setBigDecimal(13, new BigDecimal(123.45).add(new BigDecimal(i)));
206209
}
207210
pStmt.setBoolean(14, (i % 2 == 1));
208211
pStmt.setDate(15, new Date(CURRENT_TS));

aurora-mysql-plugin/src/test/java/io/cdap/plugin/aurora/mysql/AuroraMysqlSinkTestRun.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.junit.Assert;
3434
import org.junit.Test;
3535

36+
import java.math.BigDecimal;
37+
import java.math.MathContext;
3638
import java.sql.Connection;
3739
import java.sql.Date;
3840
import java.sql.ResultSet;
@@ -85,6 +87,10 @@ public void testDBSink() throws Exception {
8587
Assert.assertEquals("user2", Bytes.toString(resultSet.getBytes("TINYBLOB_COL"), 0, 5));
8688
Assert.assertEquals("user2", Bytes.toString(resultSet.getBytes("MEDIUMBLOB_COL"), 0, 5));
8789
Assert.assertEquals("user2", Bytes.toString(resultSet.getBytes("LONGBLOB_COL"), 0, 5));
90+
Assert.assertEquals(new BigDecimal(3.458, new MathContext(PRECISION)).setScale(SCALE),
91+
resultSet.getBigDecimal("NUMERIC_COL"));
92+
Assert.assertEquals(new BigDecimal(3.459, new MathContext(PRECISION)).setScale(SCALE),
93+
resultSet.getBigDecimal("DECIMAL_COL"));
8894
users.add(resultSet.getString("NAME"));
8995
Assert.assertEquals(ImmutableSet.of("user1", "user2"), users);
9096
}
@@ -104,8 +110,8 @@ private void createInputData(String inputDatasetName) throws Exception {
104110
Schema.Field.of("BIG", Schema.of(Schema.Type.LONG)),
105111
Schema.Field.of("FLOAT_COL", Schema.of(Schema.Type.FLOAT)),
106112
Schema.Field.of("REAL_COL", Schema.of(Schema.Type.FLOAT)),
107-
Schema.Field.of("NUMERIC_COL", Schema.of(Schema.Type.DOUBLE)),
108-
Schema.Field.of("DECIMAL_COL", Schema.of(Schema.Type.DOUBLE)),
113+
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(PRECISION, SCALE)),
114+
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(PRECISION, SCALE)),
109115
Schema.Field.of("BIT_COL", Schema.of(Schema.Type.BOOLEAN)),
110116
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
111117
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
@@ -134,8 +140,8 @@ private void createInputData(String inputDatasetName) throws Exception {
134140
.set("BIG", 3456987L)
135141
.set("FLOAT_COL", 3.456f)
136142
.set("REAL_COL", 3.457f)
137-
.set("NUMERIC_COL", 3.458d)
138-
.set("DECIMAL_COL", 3.459d)
143+
.setDecimal("NUMERIC_COL", new BigDecimal(3.458d, new MathContext(PRECISION)).setScale(SCALE))
144+
.setDecimal("DECIMAL_COL", new BigDecimal(3.459d, new MathContext(PRECISION)).setScale(SCALE))
139145
.set("BIT_COL", (i % 2 == 1))
140146
.setDate("DATE_COL", localDateTime.toLocalDate())
141147
.setTime("TIME_COL", localDateTime.toLocalTime())

aurora-mysql-plugin/src/test/java/io/cdap/plugin/aurora/mysql/AuroraMysqlSourceTestRun.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.junit.Assert;
3838
import org.junit.Test;
3939

40+
import java.math.BigDecimal;
41+
import java.math.MathContext;
4042
import java.nio.ByteBuffer;
4143
import java.sql.Date;
4244
import java.sql.Time;
@@ -155,10 +157,13 @@ public void testDBSource() throws Exception {
155157
Assert.assertEquals(125.45, (float) row2.get("FLOAT_COL"), 0.00001);
156158
Assert.assertEquals(124.45, (double) row1.get("REAL_COL"), 0.00001);
157159
Assert.assertEquals(125.45, (double) row2.get("REAL_COL"), 0.00001);
158-
Assert.assertEquals(124.45, (double) row1.get("NUMERIC_COL"), 0.000001);
159-
Assert.assertEquals(125.45, (double) row2.get("NUMERIC_COL"), 0.000001);
160-
Assert.assertEquals(124.45, (double) row1.get("DECIMAL_COL"), 0.000001);
161-
Assert.assertNull(row2.get("DECIMAL_COL"));
160+
Assert.assertEquals(new BigDecimal(124.45, new MathContext(PRECISION)).setScale(SCALE),
161+
row1.getDecimal("NUMERIC_COL"));
162+
Assert.assertEquals(new BigDecimal(125.45, new MathContext(PRECISION)).setScale(SCALE),
163+
row2.getDecimal("NUMERIC_COL"));
164+
Assert.assertEquals(new BigDecimal(124.45, new MathContext(PRECISION)).setScale(SCALE),
165+
row1.getDecimal("DECIMAL_COL"));
166+
Assert.assertNull(row2.getDecimal("DECIMAL_COL"));
162167
Assert.assertTrue((boolean) row1.get("BIT_COL"));
163168
Assert.assertFalse((boolean) row2.get("BIT_COL"));
164169
// Verify time columns

aurora-postgresql-plugin/src/test/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresPluginTestBase.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.junit.ClassRule;
3737
import org.postgresql.Driver;
3838

39+
import java.math.BigDecimal;
3940
import java.sql.Connection;
4041
import java.sql.Date;
4142
import java.sql.DriverManager;
@@ -58,6 +59,8 @@ public class AuroraPostgresPluginTestBase extends DatabasePluginTestBase {
5859

5960
protected static String connectionUrl;
6061
protected static int year;
62+
protected static final int PRECISION = 10;
63+
protected static final int SCALE = 6;
6164
protected static boolean tearDown = true;
6265
private static int startCount;
6366

@@ -124,8 +127,8 @@ protected static void createTestTables(Connection conn) throws SQLException {
124127
"\"NOT_IMPORTED\" VARCHAR(30)," +
125128
"\"SMALLINT_COL\" SMALLINT," +
126129
"\"BIG\" BIGINT," +
127-
"\"NUMERIC_COL\" NUMERIC(10, 2)," +
128-
"\"DECIMAL_COL\" DECIMAL(10, 2)," +
130+
"\"NUMERIC_COL\" NUMERIC(" + PRECISION + "," + SCALE + ")," +
131+
"\"DECIMAL_COL\" DECIMAL(" + PRECISION + "," + SCALE + ")," +
129132
"\"DOUBLE_PREC_COL\" DOUBLE PRECISION," +
130133
"\"DATE_COL\" DATE," +
131134
"\"TIME_COL\" TIME," +
@@ -172,8 +175,8 @@ private static void populateData(PreparedStatement ...stmts) throws SQLException
172175
pStmt.setString(5, "random" + i);
173176
pStmt.setShort(6, (short) i);
174177
pStmt.setLong(7, (long) i);
175-
pStmt.setFloat(8, (float) 123.45 + i);
176-
pStmt.setFloat(9, (float) 123.45 + i);
178+
pStmt.setBigDecimal(8, new BigDecimal(123.45).add(new BigDecimal(i)));
179+
pStmt.setBigDecimal(9, new BigDecimal(123.45).add(new BigDecimal(i)));
177180
pStmt.setDouble(10, 123.45 + i);
178181
pStmt.setDate(11, new Date(CURRENT_TS));
179182
pStmt.setTime(12, new Time(CURRENT_TS));

aurora-postgresql-plugin/src/test/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSinkTestRun.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.junit.Assert;
3131
import org.junit.Test;
3232

33+
import java.math.BigDecimal;
34+
import java.math.MathContext;
3335
import java.sql.Connection;
3436
import java.sql.Date;
3537
import java.sql.ResultSet;
@@ -79,6 +81,10 @@ public void testDBSink() throws Exception {
7981
resultSet.getTimestamp("TIMESTAMP_COL"));
8082
Assert.assertTrue(resultSet.next());
8183
Assert.assertArrayEquals("user2".getBytes(), resultSet.getBytes("BYTEA_COL"));
84+
Assert.assertEquals(new BigDecimal(3.458, new MathContext(PRECISION)).setScale(SCALE),
85+
resultSet.getBigDecimal("NUMERIC_COL"));
86+
Assert.assertEquals(new BigDecimal(3.459, new MathContext(PRECISION)).setScale(SCALE),
87+
resultSet.getBigDecimal("DECIMAL_COL"));
8288
users.add(resultSet.getString("NAME"));
8389
Assert.assertEquals(ImmutableSet.of("user1", "user2"), users);
8490

@@ -96,8 +102,8 @@ private void createInputData(String inputDatasetName) throws Exception {
96102
Schema.Field.of("GRADUATED", Schema.of(Schema.Type.BOOLEAN)),
97103
Schema.Field.of("SMALLINT_COL", Schema.of(Schema.Type.INT)),
98104
Schema.Field.of("BIG", Schema.of(Schema.Type.LONG)),
99-
Schema.Field.of("NUMERIC_COL", Schema.of(Schema.Type.DOUBLE)),
100-
Schema.Field.of("DECIMAL_COL", Schema.of(Schema.Type.DOUBLE)),
105+
Schema.Field.of("NUMERIC_COL", Schema.decimalOf(PRECISION, SCALE)),
106+
Schema.Field.of("DECIMAL_COL", Schema.decimalOf(PRECISION, SCALE)),
101107
Schema.Field.of("DOUBLE_PREC_COL", Schema.of(Schema.Type.DOUBLE)),
102108
Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
103109
Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
@@ -117,8 +123,8 @@ private void createInputData(String inputDatasetName) throws Exception {
117123
.set("GRADUATED", (i % 2 == 0))
118124
.set("SMALLINT_COL", i + 2)
119125
.set("BIG", 3456987L)
120-
.set("NUMERIC_COL", 3.458d)
121-
.set("DECIMAL_COL", 3.459d)
126+
.setDecimal("NUMERIC_COL", new BigDecimal(3.458d, new MathContext(PRECISION)).setScale(SCALE))
127+
.setDecimal("DECIMAL_COL", new BigDecimal(3.459d, new MathContext(PRECISION)).setScale(SCALE))
122128
.set("DOUBLE_PREC_COL", 3.459d)
123129
.setDate("DATE_COL", localDateTime.toLocalDate())
124130
.setTime("TIME_COL", localDateTime.toLocalTime())

aurora-postgresql-plugin/src/test/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSourceTestRun.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.junit.Assert;
3838
import org.junit.Test;
3939

40+
import java.math.BigDecimal;
41+
import java.math.MathContext;
4042
import java.nio.ByteBuffer;
4143
import java.sql.Date;
4244
import java.sql.Time;
@@ -139,9 +141,12 @@ public void testDBSource() throws Exception {
139141
Assert.assertEquals(1, (long) row1.get("BIG"));
140142
Assert.assertEquals(2, (long) row2.get("BIG"));
141143

142-
Assert.assertEquals(124.45, (double) row1.get("NUMERIC_COL"), 0.000001);
143-
Assert.assertEquals(125.45, (double) row2.get("NUMERIC_COL"), 0.000001);
144-
Assert.assertEquals(124.45, (double) row1.get("DECIMAL_COL"), 0.000001);
144+
Assert.assertEquals(new BigDecimal(124.45, new MathContext(PRECISION)).setScale(SCALE),
145+
row1.getDecimal("NUMERIC_COL"));
146+
Assert.assertEquals(new BigDecimal(125.45, new MathContext(PRECISION)).setScale(SCALE),
147+
row2.getDecimal("NUMERIC_COL"));
148+
Assert.assertEquals(new BigDecimal(124.45, new MathContext(PRECISION)).setScale(SCALE),
149+
row1.getDecimal("DECIMAL_COL"));
145150

146151
Assert.assertEquals(124.45, (double) row1.get("DOUBLE_PREC_COL"), 0.000001);
147152
Assert.assertEquals(125.45, (double) row2.get("DOUBLE_PREC_COL"), 0.000001);

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,15 @@
1616

1717
package io.cdap.plugin.db;
1818

19-
import com.google.common.base.Strings;
2019
import com.google.common.collect.Lists;
2120
import io.cdap.cdap.api.data.schema.Schema;
2221
import io.cdap.cdap.api.data.schema.UnsupportedTypeException;
2322

24-
import java.io.IOException;
2523
import java.sql.ResultSet;
2624
import java.sql.ResultSetMetaData;
2725
import java.sql.SQLException;
2826
import java.sql.Types;
2927
import java.util.List;
30-
import javax.annotation.Nullable;
3128

3229
/**
3330
* Common schema reader for mapping non specific DB types.
@@ -90,11 +87,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
9087
case Types.DECIMAL:
9188
int precision = metadata.getPrecision(index); // total number of digits
9289
int scale = metadata.getScale(index); // digits after the decimal point
93-
// if there are no digits after the point, use integer types
94-
type = scale != 0 ? Schema.Type.DOUBLE :
95-
// with 10 digits we can represent 2^32 and LONG is required
96-
precision > 9 ? Schema.Type.LONG : Schema.Type.INT;
97-
break;
90+
return Schema.decimalOf(precision, scale);
9891

9992
case Types.DOUBLE:
10093
type = Schema.Type.DOUBLE;

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.DataInput;
3131
import java.io.DataOutput;
3232
import java.io.IOException;
33+
import java.math.BigDecimal;
3334
import java.nio.ByteBuffer;
3435
import java.sql.Date;
3536
import java.sql.PreparedStatement;
@@ -148,6 +149,8 @@ protected void setField(ResultSet resultSet, StructuredRecord.Builder recordBuil
148149
} else if (o instanceof Timestamp) {
149150
Instant instant = ((Timestamp) o).toInstant();
150151
recordBuilder.setTimestamp(field.getName(), instant.atZone(ZoneId.ofOffset("UTC", ZoneOffset.UTC)));
152+
} else if (o instanceof BigDecimal) {
153+
recordBuilder.setDecimal(field.getName(), (BigDecimal) o);
151154
} else {
152155
recordBuilder.set(field.getName(), o);
153156
}
@@ -287,6 +290,9 @@ private void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldInde
287290
case TIMESTAMP_MICROS:
288291
stmt.setTimestamp(sqlIndex, Timestamp.from(record.getTimestamp(fieldName).toInstant()));
289292
break;
293+
case DECIMAL:
294+
stmt.setBigDecimal(sqlIndex, record.getDecimal(fieldName));
295+
break;
290296
}
291297
return;
292298
}

database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.lang.reflect.Field;
3535
import java.lang.reflect.Method;
3636
import java.math.BigDecimal;
37+
import java.math.MathContext;
3738
import java.sql.Blob;
3839
import java.sql.Clob;
3940
import java.sql.Driver;
@@ -212,11 +213,7 @@ public static Schema getSchema(int sqlType, int precision, int scale) throws SQL
212213

213214
case Types.NUMERIC:
214215
case Types.DECIMAL:
215-
// if there are no digits after the point, use integer types
216-
type = scale != 0 ? Schema.Type.DOUBLE :
217-
// with 10 digits we can represent 2^32 and LONG is required
218-
precision > 9 ? Schema.Type.LONG : Schema.Type.INT;
219-
break;
216+
return Schema.decimalOf(precision, scale);
220217

221218
case Types.DOUBLE:
222219
type = Schema.Type.DOUBLE;
@@ -262,15 +259,7 @@ public static Object transformValue(int sqlType, int precision, int scale,
262259
case Types.NUMERIC:
263260
case Types.DECIMAL:
264261
BigDecimal decimal = (BigDecimal) original;
265-
if (scale != 0) {
266-
// if there are digits after the point, use double types
267-
return decimal.doubleValue();
268-
} else if (precision > 9) {
269-
// with 10 digits we can represent 2^32 and LONG is required
270-
return decimal.longValue();
271-
} else {
272-
return decimal.intValue();
273-
}
262+
return new BigDecimal(decimal.unscaledValue(), scale, new MathContext(precision));
274263
case Types.DATE:
275264
return resultSet.getDate(columnIndex);
276265
case Types.TIME:

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2PluginTestBase.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public class Db2PluginTestBase extends DatabasePluginTestBase {
5959

6060
protected static String connectionUrl;
6161
protected static int year;
62+
protected static final int PRECISION = 10;
63+
protected static final int SCALE = 6;
6264
protected static boolean tearDown = true;
6365
private static int startCount;
6466

@@ -128,8 +130,8 @@ protected static void createTestTables(Connection conn) throws SQLException {
128130
" SMALLINT_COL SMALLINT," +
129131
" INTEGER_COL INTEGER," +
130132
" BIGINT_COL BIGINT," +
131-
" DECIMAL_COL DECIMAL(10, 2)," +
132-
" NUMERIC_COL NUMERIC(10, 2)," +
133+
" DECIMAL_COL DECIMAL(" + PRECISION + "," + SCALE + ")," +
134+
" NUMERIC_COL NUMERIC(" + PRECISION + "," + SCALE + ")," +
133135
" DECFLOAT_COL DECFLOAT," +
134136
" REAL_COL REAL," +
135137
" DOUBLE_COL DOUBLE," +
@@ -176,9 +178,9 @@ private static void populateData(PreparedStatement ...stmts) throws SQLException
176178
pStmt.setShort(1, (short) i);
177179
pStmt.setInt(2, i);
178180
pStmt.setLong(3, (long) i);
179-
pStmt.setBigDecimal(4, new BigDecimal(i + 3.14));
180-
pStmt.setBigDecimal(5, new BigDecimal(i + 3.14));
181-
pStmt.setBigDecimal(6, new BigDecimal(i + 3.14));
181+
pStmt.setBigDecimal(4, new BigDecimal(3.14).add(new BigDecimal(i)));
182+
pStmt.setBigDecimal(5, new BigDecimal(3.14).add(new BigDecimal(i)));
183+
pStmt.setBigDecimal(6, new BigDecimal(3.14).add(new BigDecimal(i)));
182184
pStmt.setFloat(7, i + 3.14f);
183185
pStmt.setDouble(8, i + 3.14);
184186
pStmt.setString(9, name);

0 commit comments

Comments
 (0)