Skip to content

Commit c2e32cf

Browse files
committed
Support reading json and jsonb types in PostgreSQL dialect
1 parent b598054 commit c2e32cf

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper {
8484
private static final String PG_CHARACTER_ARRAY = "_character";
8585
private static final String PG_CHARACTER_VARYING = "varchar";
8686
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
87+
private static final String PG_JSON = "json";
88+
private static final String PG_JSONB = "jsonb";
8789

8890
@Override
8991
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
@@ -157,6 +159,8 @@ protected DataType getMapping(String pgType, int precision, int scale) {
157159
case PG_CHARACTER_VARYING_ARRAY:
158160
return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
159161
case PG_TEXT:
162+
case PG_JSON:
163+
case PG_JSONB:
160164
return DataTypes.STRING();
161165
case PG_TEXT_ARRAY:
162166
return DataTypes.ARRAY(DataTypes.STRING());

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter;
2323
import org.apache.flink.table.data.GenericArrayData;
24+
import org.apache.flink.table.data.StringData;
2425
import org.apache.flink.table.types.logical.ArrayType;
2526
import org.apache.flink.table.types.logical.LogicalType;
2627
import org.apache.flink.table.types.logical.LogicalTypeRoot;
2728
import org.apache.flink.table.types.logical.RowType;
2829
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
30+
import org.postgresql.util.PGobject;
2931

3032
import java.lang.reflect.Array;
3133

@@ -91,7 +93,19 @@ private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arra
9193
// Have its own method so that Postgres can support primitives that super class doesn't support
9294
// in the future
9395
private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) {
94-
return super.createInternalConverter(type);
96+
switch (type.getTypeRoot()) {
97+
case VARCHAR:
98+
return val -> {
99+
if (val instanceof PGobject) {
100+
PGobject obj = (PGobject) val;
101+
return StringData.fromString(obj.getValue());
102+
} else {
103+
return StringData.fromString((String) val);
104+
}
105+
};
106+
default:
107+
return super.createInternalConverter(type);
108+
}
95109
}
96110

97111
@Override

0 commit comments

Comments
 (0)