Skip to content

Commit 3c1accd

Browse files
authored
Merge pull request #443 from data-integrations/cloudSQL_PrecisionFix_1.10
Postgres BugFix 1.10 Release
2 parents 5935736 + 8499c16 commit 3c1accd

File tree

10 files changed

+173
-18
lines changed

10 files changed

+173
-18
lines changed

cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
148148
| double precision | double | |
149149
| integer | int | |
150150
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
151+
| numeric(with 0 precision) | string | |
151152
| real | float | |
152153
| smallint | int | |
153154
| text | string | |

cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
172172
| double precision | double | |
173173
| integer | int | |
174174
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
175+
| numeric(with 0 precision) | string | |
175176
| real | float | |
176177
| smallint | int | |
177178
| smallserial | int | |

postgresql-plugin/docs/Postgres-batchsink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
7979
| double precision | double | |
8080
| integer | int | |
8181
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
82+
| numeric(with 0 precision) | string | |
8283
| real | float | |
8384
| smallint | int | |
8485
| text | string | |

postgresql-plugin/docs/Postgres-batchsource.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
110110
| double precision | double | |
111111
| integer | int | |
112112
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
113+
| numeric(with 0 precision) | string | |
113114
| real | float | |
114115
| smallint | int | |
115116
| smallserial | int | |

postgresql-plugin/pom.xml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,6 @@
5353
<version>42.2.20</version>
5454
<scope>test</scope>
5555
</dependency>
56-
<dependency>
57-
<groupId>org.mockito</groupId>
58-
<artifactId>mockito-core</artifactId>
59-
</dependency>
6056
<dependency>
6157
<groupId>io.cdap.plugin</groupId>
6258
<artifactId>database-commons</artifactId>
@@ -72,9 +68,15 @@
7268
<groupId>io.cdap.cdap</groupId>
7369
<artifactId>cdap-data-pipeline3_2.12</artifactId>
7470
</dependency>
71+
<dependency>
72+
<groupId>org.mockito</groupId>
73+
<artifactId>mockito-core</artifactId>
74+
<scope>test</scope>
75+
</dependency>
7576
<dependency>
7677
<groupId>junit</groupId>
7778
<artifactId>junit</artifactId>
79+
<scope>test</scope>
7880
</dependency>
7981
<dependency>
8082
<groupId>io.cdap.cdap</groupId>

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424

2525
import java.lang.reflect.InvocationTargetException;
2626
import java.lang.reflect.Method;
27+
import java.math.BigDecimal;
28+
import java.math.RoundingMode;
2729
import java.sql.PreparedStatement;
2830
import java.sql.ResultSet;
2931
import java.sql.ResultSetMetaData;
3032
import java.sql.SQLException;
33+
import java.sql.Types;
3134
import java.util.List;
3235

3336
/**
@@ -49,24 +52,41 @@ public PostgresDBRecord() {
4952
@Override
5053
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
5154
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
55+
ResultSetMetaData metadata = resultSet.getMetaData();
5256
if (isUseSchema(resultSet.getMetaData(), columnIndex)) {
5357
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
54-
} else {
55-
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
58+
return;
5659
}
60+
int columnType = metadata.getColumnType(columnIndex);
61+
if (columnType == Types.NUMERIC) {
62+
Schema nonNullableSchema = field.getSchema().isNullable() ?
63+
field.getSchema().getNonNullable() : field.getSchema();
64+
int precision = metadata.getPrecision(columnIndex);
65+
if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) {
66+
// When output schema is set to String for precision less numbers
67+
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
68+
return;
69+
}
70+
BigDecimal orgValue = resultSet.getBigDecimal(columnIndex);
71+
if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType()) && orgValue != null) {
72+
BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString())
73+
.setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN);
74+
recordBuilder.setDecimal(field.getName(), decimalValue);
75+
return;
76+
}
77+
}
78+
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
5779
}
5880

5981
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
60-
switch (metadata.getColumnTypeName(columnIndex)) {
61-
case "bit":
62-
case "timetz":
63-
case "money":
64-
return true;
65-
default:
66-
return PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex));
67-
}
82+
String columnTypeName = metadata.getColumnTypeName(columnIndex);
83+
// If the column Type Name is present in the String mapped PostgreSQL types then return true.
84+
return (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName)
85+
|| PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex)));
86+
6887
}
6988

89+
7090
private Object createPGobject(String type, String value, ClassLoader classLoader) throws SQLException {
7191
try {
7292
Class pGObjectClass = classLoader.loadClass("org.postgresql.util.PGobject");
@@ -89,11 +109,17 @@ protected void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldIn
89109
if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnType.getTypeName()) ||
90110
PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(columnType.getType())) {
91111
stmt.setObject(sqlIndex, createPGobject(columnType.getTypeName(),
92-
record.get(field.getName()),
93-
stmt.getClass().getClassLoader()));
94-
} else {
95-
super.writeToDB(stmt, field, fieldIndex);
112+
record.get(field.getName()),
113+
stmt.getClass().getClassLoader()));
114+
return;
115+
}
116+
if (columnType.getType() == Types.NUMERIC && record.get(field.getName()) != null &&
117+
field.getSchema().getType() == Schema.Type.STRING) {
118+
stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(field.getName())));
119+
return;
96120
}
121+
122+
super.writeToDB(stmt, field, fieldIndex);
97123
}
98124

99125
@Override

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresFieldsValidator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.sql.ResultSetMetaData;
2323
import java.sql.SQLException;
24+
import java.sql.Types;
2425
import java.util.Objects;
2526

2627
/**
@@ -45,6 +46,13 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
4546
return false;
4647
}
4748
}
49+
// Since Numeric types without precision and scale are getting converted into CDAP String type at the Source
50+
// plugin, hence making the String type compatible with the Numeric type at the Sink as well.
51+
if (fieldType.equals(Schema.Type.STRING)) {
52+
if (Types.NUMERIC == columnType) {
53+
return true;
54+
}
55+
}
4856

4957
return super.isFieldCompatible(field, metadata, index);
5058
}

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import io.cdap.cdap.api.data.schema.Schema;
2121
import io.cdap.plugin.db.CommonSchemaReader;
2222

23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
2326
import java.sql.ResultSetMetaData;
2427
import java.sql.SQLException;
2528
import java.sql.Types;
@@ -30,6 +33,8 @@
3033
*/
3134
public class PostgresSchemaReader extends CommonSchemaReader {
3235

36+
private static final Logger LOG = LoggerFactory.getLogger(PostgresSchemaReader.class);
37+
3338
public static final Set<Integer> STRING_MAPPED_POSTGRES_TYPES = ImmutableSet.of(
3439
Types.OTHER, Types.ARRAY, Types.SQLXML
3540
);
@@ -57,6 +62,17 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
5762
if (STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(typeName) || STRING_MAPPED_POSTGRES_TYPES.contains(columnType)) {
5863
return Schema.of(Schema.Type.STRING);
5964
}
65+
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
66+
if (Types.NUMERIC == columnType) {
67+
int precision = metadata.getPrecision(index);
68+
if (precision == 0) {
69+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
70+
+ "converting into STRING type to avoid any precision loss.",
71+
metadata.getColumnName(index),
72+
metadata.getColumnTypeName(index)));
73+
return Schema.of(Schema.Type.STRING);
74+
}
75+
}
6076

6177
return super.getSchema(metadata, index);
6278
}

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.api.annotation.MetadataProperty;
2424
import io.cdap.cdap.api.annotation.Name;
2525
import io.cdap.cdap.api.annotation.Plugin;
26+
import io.cdap.cdap.api.data.schema.Schema;
2627
import io.cdap.cdap.etl.api.FailureCollector;
2728
import io.cdap.cdap.etl.api.batch.BatchSource;
2829
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
@@ -137,5 +138,22 @@ public void validate(FailureCollector collector) {
137138
ConfigUtil.validateConnection(this, useConnection, connection, collector);
138139
super.validate(collector);
139140
}
141+
142+
@Override
143+
protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema,
144+
Schema expectedFieldSchema) {
145+
146+
// This change is needed to make sure that the pipeline upgrade continues to work post upgrade.
147+
// Since the older handling of the precision less used to convert to the decimal type,
148+
// and the new version would try to convert to the String type. In that case the output schema would
149+
// contain Decimal(38, 0) (or something similar), and the code internally would try to identify
150+
// the schema of the field(without precision and scale) as String.
151+
if (Schema.LogicalType.DECIMAL.equals(expectedFieldSchema.getLogicalType()) &&
152+
actualFieldSchema.getType().equals(Schema.Type.STRING)) {
153+
return;
154+
}
155+
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);
156+
}
140157
}
141158
}
159+
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.plugin.postgres;
17+
18+
import io.cdap.cdap.api.data.format.StructuredRecord;
19+
import io.cdap.cdap.api.data.schema.Schema;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.mockito.Mockito;
25+
import org.mockito.junit.MockitoJUnitRunner;
26+
27+
import java.math.BigDecimal;
28+
import java.sql.ResultSet;
29+
import java.sql.ResultSetMetaData;
30+
import java.sql.Types;
31+
32+
import static org.mockito.ArgumentMatchers.eq;
33+
import static org.mockito.Mockito.when;
34+
35+
36+
@RunWith(MockitoJUnitRunner.class)
37+
public class PostgresDBRecordUnitTest {
38+
39+
private static final int DEFAULT_PRECISION = 38;
40+
41+
/**
42+
* Validate the precision less Numbers handling against following use cases.
43+
* 1. Ensure that the numeric type with [p,s] set as [38,4] detect as BigDecimal(38,4) in cdap.
44+
* 2. Ensure that the numeric type without [p,s] detect as String type in cdap.
45+
* @throws Exception
46+
*/
47+
@Test
48+
public void validatePrecisionLessDecimalParsing() throws Exception {
49+
Schema.Field field1 = Schema.Field.of("ID1", Schema.decimalOf(DEFAULT_PRECISION, 4));
50+
Schema.Field field2 = Schema.Field.of("ID2", Schema.of(Schema.Type.STRING));
51+
52+
Schema schema = Schema.recordOf(
53+
"dbRecord",
54+
field1,
55+
field2
56+
);
57+
58+
ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
59+
when(resultSetMetaData.getColumnType(eq(1))).thenReturn(Types.NUMERIC);
60+
when(resultSetMetaData.getPrecision(eq(1))).thenReturn(DEFAULT_PRECISION);
61+
when(resultSetMetaData.getColumnType(eq(2))).thenReturn(Types.NUMERIC);
62+
when(resultSetMetaData.getPrecision(eq(2))).thenReturn(0);
63+
64+
ResultSet resultSet = Mockito.mock(ResultSet.class);
65+
66+
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
67+
when(resultSet.getBigDecimal(eq(1))).thenReturn(BigDecimal.valueOf(123.4568));
68+
when(resultSet.getString(eq(2))).thenReturn("123.4568");
69+
70+
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
71+
PostgresDBRecord dbRecord = new PostgresDBRecord(null, null);
72+
dbRecord.handleField(resultSet, builder, field1, 1, Types.NUMERIC, DEFAULT_PRECISION, 4);
73+
dbRecord.handleField(resultSet, builder, field2, 2, Types.NUMERIC, 0, -127);
74+
75+
StructuredRecord record = builder.build();
76+
Assert.assertTrue(record.getDecimal("ID1") instanceof BigDecimal);
77+
Assert.assertEquals(record.getDecimal("ID1"), BigDecimal.valueOf(123.4568));
78+
Assert.assertTrue(record.get("ID2") instanceof String);
79+
Assert.assertEquals(record.get("ID2"), "123.4568");
80+
}
81+
}

0 commit comments

Comments
 (0)