Skip to content

Commit 3b2820c

Browse files
authored
Merge pull request #50 from vladhlinskiy/feature/CDAP-15547-postgress-all-data-types-2
CDAP-15547 Postgres db plugin enhacements: all data types support + proper test coverage
2 parents ebecc7d + b2f9a58 commit 3b2820c

File tree

11 files changed

+503
-78
lines changed

11 files changed

+503
-78
lines changed

database-commons/src/main/java/io/cdap/plugin/db/ColumnType.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,24 @@
2121
*/
2222
public class ColumnType {
2323

24-
private final String name;
25-
private final int type;
24+
private String name;
25+
private String typeName;
26+
private int type;
2627

27-
public ColumnType(String name, int type) {
28+
public ColumnType(String name, String typeName, int type) {
2829
this.name = name;
30+
this.typeName = typeName;
2931
this.type = type;
3032
}
3133

3234
public String getName() {
3335
return name;
3436
}
3537

38+
public String getTypeName() {
39+
return typeName;
40+
}
41+
3642
public int getType() {
3743
return type;
3844
}

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ protected void setField(ResultSet resultSet, StructuredRecord.Builder recordBuil
157157

158158
protected void setFieldAccordingToSchema(ResultSet resultSet, StructuredRecord.Builder recordBuilder,
159159
Schema.Field field, int columnIndex) throws SQLException {
160-
161-
Schema.Type fieldType = field.getSchema().getType();
160+
Schema.Type fieldType = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType()
161+
: field.getSchema().getType();
162162

163163
switch (fieldType) {
164164
case NULL:

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,10 @@
6565
import java.util.Collections;
6666
import java.util.HashSet;
6767
import java.util.List;
68-
import java.util.Map;
6968
import java.util.Objects;
7069
import java.util.Optional;
7170
import java.util.Properties;
7271
import java.util.Set;
73-
import java.util.TreeMap;
7472
import java.util.stream.Collectors;
7573

7674
/**
@@ -235,7 +233,7 @@ public void setColumns(List<String> columns) {
235233
}
236234

237235
private void setResultSetMetadata() throws Exception {
238-
Map<String, Integer> columnToType = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
236+
List<ColumnType> columnTypes = new ArrayList<>(columns.size());
239237
String connectionString = dbSinkConfig.getConnectionString();
240238

241239
driverCleanup = DBUtils.ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.jdbcPluginName);
@@ -255,18 +253,17 @@ private void setResultSetMetadata() throws Exception {
255253
// JDBC driver column indices start with 1
256254
for (int i = 0; i < rs.getMetaData().getColumnCount(); i++) {
257255
String name = resultSetMetadata.getColumnName(i + 1);
256+
String columnTypeName = resultSetMetadata.getColumnTypeName(i + 1);
258257
int type = resultSetMetadata.getColumnType(i + 1);
259-
columnToType.put(name, type);
258+
String schemaColumnName = columns.get(i);
259+
Preconditions.checkArgument(schemaColumnName.toLowerCase().equals(name.toLowerCase()),
260+
"Missing column '%s' in SQL table", schemaColumnName);
261+
columnTypes.add(new ColumnType(schemaColumnName, columnTypeName, type));
260262
}
261263
}
262264
}
263265

264-
this.columnTypes = columns.stream()
265-
.map(name -> {
266-
Preconditions.checkArgument(columnToType.containsKey(name), "Missing column '%s' in SQL table", name);
267-
return new ColumnType(name, columnToType.get(name));
268-
})
269-
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
266+
this.columnTypes = Collections.unmodifiableList(columnTypes);
270267
}
271268

272269
private void validateSchema(Class<? extends Driver> jdbcDriverClass, String tableName, Schema inputSchema) {

postgresql-plugin/docs/Postgres-batchsink.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,47 @@ Table Name: "users"
5656
Username: "root"
5757
Password: "root"
5858
```
59+
Data Types Mapping
60+
------
61+
All PostgreSQL specific data types mapped to string and can have multiple input formats and one 'canonical' output form.
62+
Please, refer to PostgreSQL data types documentation to figure out proper formats.
63+
64+
| PostgreSQL Data Type | CDAP Schema Data Type | Comment |
65+
|-----------------------------------------------------|-----------------------|----------------------------------------------|
66+
| bigint | long | |
67+
| bit(n) | string | string with '0' and '1' chars exact n length |
68+
| bit varying(n) | string | string with '0' and '1' chars max n length |
69+
| boolean | boolean | |
70+
| bytea | bytes | |
71+
| character | string | |
72+
| character varying | string | |
73+
| double precision | double | |
74+
| integer | int | |
75+
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
76+
| real | float | |
77+
| smallint | int | |
78+
| text | string | |
79+
| date | date | |
80+
| time [ (p) ] [ without time zone ] | time | |
81+
| time [ (p) ] with time zone | string | |
82+
| timestamp [ (p) ] [ without time zone ] | timestamp | |
83+
| timestamp [ (p) ] with time zone | timestamp | stored in UTC format in database |
84+
| xml | string | |
85+
| tsquery | string | |
86+
| tsvector | string | |
87+
| uuid | string | |
88+
| box | string | |
89+
| cidr | string | |
90+
| circle | string | |
91+
| inet | string | |
92+
| interval | string | |
93+
| json | string | |
94+
| jsonb | string | |
95+
| line | string | |
96+
| lseg | string | |
97+
| macaddr | string | |
98+
| macaddr8 | string | |
99+
| money | string | |
100+
| path | string | |
101+
| point | string | |
102+
| polygon | string | |

postgresql-plugin/docs/Postgres-batchsource.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,51 @@ non-nullable varchars, output records will have this schema:
8383
| name | string |
8484
| email | string |
8585
| phone | string |
86+
87+
Data Types Mapping
88+
------
89+
All PostgreSQL specific data types mapped to string and can have multiple input formats and one 'canonical' output form.
90+
Please, refer to PostgreSQL data types documentation to figure out proper formats.
91+
92+
| PostgreSQL Data Type | CDAP Schema Data Type | Comment |
93+
|-----------------------------------------------------|-----------------------|----------------------------------------------|
94+
| bigint | long | |
95+
| bigserial | long | |
96+
| bit(n) | string | string with '0' and '1' chars exact n length |
97+
| bit varying(n) | string | string with '0' and '1' chars max n length |
98+
| boolean | boolean | |
99+
| bytea | bytes | |
100+
| character | string | |
101+
| character varying | string | |
102+
| double precision | double | |
103+
| integer | int | |
104+
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
105+
| real | float | |
106+
| smallint | int | |
107+
| smallserial | int | |
108+
| serial | int | |
109+
| text | string | |
110+
| date | date | |
111+
| time [ (p) ] [ without time zone ] | time | |
112+
| time [ (p) ] with time zone | string | |
113+
| timestamp [ (p) ] [ without time zone ] | timestamp | |
114+
| timestamp [ (p) ] with time zone | timestamp | stored in UTC format in database |
115+
| xml | string | |
116+
| tsquery | string | |
117+
| tsvector | string | |
118+
| uuid | string | |
119+
| box | string | |
120+
| cidr | string | |
121+
| circle | string | |
122+
| inet | string | |
123+
| interval | string | |
124+
| json | string | |
125+
| jsonb | string | |
126+
| line | string | |
127+
| lseg | string | |
128+
| macaddr | string | |
129+
| macaddr8 | string | |
130+
| money | string | |
131+
| path | string | |
132+
| point | string | |
133+
| polygon | string | |

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import io.cdap.plugin.db.DBRecord;
2323
import io.cdap.plugin.db.SchemaReader;
2424

25+
import java.lang.reflect.InvocationTargetException;
26+
import java.lang.reflect.Method;
27+
import java.sql.PreparedStatement;
2528
import java.sql.ResultSet;
29+
import java.sql.ResultSetMetaData;
2630
import java.sql.SQLException;
2731
import java.util.List;
2832

@@ -39,21 +43,57 @@ public PostgresDBRecord(StructuredRecord record, List<ColumnType> columnTypes) {
3943
* Used in map-reduce. Do not remove.
4044
*/
4145
@SuppressWarnings("unused")
42-
public PostgresDBRecord() {}
46+
public PostgresDBRecord() {
47+
}
4348

4449
@Override
4550
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
4651
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
47-
if (PostgresSchemaReader.POSTGRES_TYPES.contains(sqlType)) {
48-
handleSpecificType(resultSet, recordBuilder, field, columnIndex);
52+
if (isUseSchema(resultSet.getMetaData(), columnIndex)) {
53+
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
4954
} else {
5055
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
5156
}
5257
}
5358

54-
private void handleSpecificType(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
55-
int columnIndex) throws SQLException {
56-
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
59+
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
60+
switch (metadata.getColumnTypeName(columnIndex)) {
61+
case "bit":
62+
case "timetz":
63+
case "money":
64+
return true;
65+
default:
66+
return PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex));
67+
}
68+
}
69+
70+
private Object createPGobject(String type, String value, ClassLoader classLoader) throws SQLException {
71+
try {
72+
Class pGObjectClass = classLoader.loadClass("org.postgresql.util.PGobject");
73+
Method setTypeMethod = pGObjectClass.getMethod("setType", String.class);
74+
Method setValueMethod = pGObjectClass.getMethod("setValue", String.class);
75+
Object result = pGObjectClass.newInstance();
76+
setTypeMethod.invoke(result, type);
77+
setValueMethod.invoke(result, value);
78+
return result;
79+
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException |
80+
InvocationTargetException e) {
81+
throw new SQLException("Failed to create instance of org.postgresql.util.PGobject");
82+
}
83+
}
84+
85+
@Override
86+
protected void writeToDB(PreparedStatement stmt, Schema.Field field, int fieldIndex) throws SQLException {
87+
int sqlIndex = fieldIndex + 1;
88+
ColumnType columnType = columnTypes.get(fieldIndex);
89+
if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnType.getTypeName()) ||
90+
PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(columnType.getType())) {
91+
stmt.setObject(sqlIndex, createPGobject(columnType.getTypeName(),
92+
record.get(field.getName()),
93+
stmt.getClass().getClassLoader()));
94+
} else {
95+
super.writeToDB(stmt, field, fieldIndex);
96+
}
5797
}
5898

5999
@Override

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,20 @@
3030
*/
3131
public class PostgresSchemaReader extends CommonSchemaReader {
3232

33-
public static final Set<Integer> POSTGRES_TYPES = ImmutableSet.of(
33+
public static final Set<Integer> STRING_MAPPED_POSTGRES_TYPES = ImmutableSet.of(
3434
Types.OTHER, Types.ARRAY, Types.SQLXML
3535
);
3636

37+
public static final Set<String> STRING_MAPPED_POSTGRES_TYPES_NAMES = ImmutableSet.of(
38+
"bit", "timetz", "money"
39+
);
40+
3741
@Override
3842
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
39-
if (POSTGRES_TYPES.contains(metadata.getColumnType(index))) {
43+
String typeName = metadata.getColumnTypeName(index);
44+
int columnType = metadata.getColumnType(index);
45+
46+
if (STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(typeName) || STRING_MAPPED_POSTGRES_TYPES.contains(columnType)) {
4047
return Schema.of(Schema.Type.STRING);
4148
}
4249

0 commit comments

Comments
 (0)