diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index 2ed47cfea92..ca76905ae37 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -290,7 +290,6 @@ pipeline:
## 数据类型映射
-
@@ -401,11 +622,11 @@ PostgreSQL通过PostGIS扩展支持空间数据类型:
GEOMETRY(POINT, xx)
- {"hexewkb":"0101000020730c00001c7c613255de6540787aa52c435c42c0","srid":3187}
+ {"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}"
GEOGRAPHY(MULTILINESTRING)
- {"hexewkb":"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0","srid":4326}
+ {"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index 7580028787c..e52fa34bbe5 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -285,7 +285,6 @@ Notice:
## Data Type Mapping
-
@@ -295,6 +294,17 @@ Notice:
+
+
+ BOOLEAN
+ BIT(1)
+ BOOLEAN
+
+
+
+ BIT( > 1)
+ BYTES
+
SMALLINT
@@ -312,69 +322,284 @@ Notice:
BIGINT
- BIGSERIAL
+ BIGSERIAL
+ OID
+
BIGINT
-
- NUMERIC
- DECIMAL(20, 0)
-
REAL
- FLOAT4
+ FLOAT4
+
FLOAT
-
-
- FLOAT8
- DOUBLE PRECISION
- DOUBLE
-
-
-
- NUMERIC(p, s)
- DECIMAL(p, s)
- DECIMAL(p, s)
+
+ NUMERIC
+ DECIMAL(38, 0)
- BOOLEAN
- BOOLEAN
+ DOUBLE PRECISION
+ FLOAT8
+
+ DOUBLE
+
+ CHAR[(M)]
+ VARCHAR[(M)]
+ CHARACTER[(M)]
+ BPCHAR[(M)]
+ CHARACTER VARYING[(M)]
+
+ STRING
+
- DATE
- DATE
+ TIMESTAMPTZ
+ TIMESTAMP WITH TIME ZONE
+ ZonedTimestampType
- TIME [(p)] [WITHOUT TIMEZONE]
- TIME [(p)] [WITHOUT TIMEZONE]
+ INTERVAL [P]
+ BIGINT
- TIMESTAMP [(p)] [WITHOUT TIMEZONE]
- TIMESTAMP [(p)] [WITHOUT TIMEZONE]
+ INTERVAL [P]
+ STRING(when debezium.interval.handling.mode is set to string)
-
- CHAR(n)
- CHARACTER(n)
- VARCHAR(n)
- CHARACTER VARYING(n)
- CHAR(n)
+ BYTEA
+ BYTES or STRING (when debezium.binary.handling.mode is set to base64 or base64-url-safe or hex)
- TEXT
+ JSON
+ JSONB
+ XML
+ UUID
+ POINT
+ LTREE
+ CITEXT
+ INET
+ INT4RANGE
+ INT8RANGE
+ NUMRANGE
+ TSRANGE
+ DATERANGE
+ ENUM
+
STRING
-
- BYTEA
- BYTES
-
-### Postgres Spatial Data Types Mapping
+### Temporal types Mapping
+Other than PostgreSQL’s TIMESTAMPTZ data types, which contain time zone information, how temporal types are mapped depends on the value of
debezium.the time.precision.mode connector configuration property. The following sections describe these mappings:
+- debezium.time.precision.mode=adaptive
+- debezium.time.precision.mode=adaptive_time_microseconds
+- debezium.time.precision.mode=connect
+
+Note: Due to current CDC limitations in supporting time types, when
debezium.time.precision.mode is set to "adaptive", "adaptive_time_microseconds", or when using Connect time types, all time values are converted to the Integer type with a precision of 3. This will be improved in future updates.
+
+
debezium.time.precision.mode=adaptive
+
+When the
debezium.time.precision.mode property is set to adaptive, the default, the connector determines the literal type and semantic type based on the column’s data type definition. This ensures that events exactly represent the values in the database.
+
+
+
+
+ PostgreSQL type
+ CDC type
+
+
+
+
+
+ DATE
+ DATE
+
+
+
+ TIME([P])
+
+ INTEGER
+
+
+
+ TIMESTAMP([P])
+
+ TIMESTAMP([P])
+
+
+
+
+
+### Decimal types Mapping
+The setting of the PostgreSQL connector configuration property
debezium.decimal.handling.mode determines how the connector maps decimal types.
+
+When the
debezium.decimal.handling.mode property is set to precise, the connector uses the Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL, NUMERIC and MONEY columns. This is the default mode.
+
+
+
+
+ PostgreSQL type
+ CDC type
+
+
+
+
+
+ NUMERIC[(M[,D])]
+ DECIMAL[(M[,D])]
+
+
+
+ NUMERIC
+ DECIMAL(38,0)
+
+
+
+ DECIMAL[(M[,D])]
+ DECIMAL[(M[,D])]
+
+
+
+ DECIMAL
+ DECIMAL(38,0)
+
+
+
+ MONEY[(M[,D])]
+ DECIMAL(38,digits)(The scale schema parameter contains an integer representing how many digits the decimal point was shifted. The scale schema parameter is determined by the money.fraction.digits connector configuration property.)
+
+
+
+
+
+When the
debezium.decimal.handling.mode property is set to double, the connector represents all DECIMAL, NUMERIC and MONEY values as Java double values and encodes them as shown in the following table.
+
+
+
+
+
+ PostgreSQL type
+ CDC type
+
+
+
+
+
+ NUMERIC[(M[,D])]
+ DOUBLE
+
+
+
+ DECIMAL[(M[,D])]
+ DOUBLE
+
+
+
+ MONEY[(M[,D])]
+ DOUBLE
+
+
+
+
+
+The last possible setting for the
debezium.decimal.handling.mode configuration property is string. In this case, the connector represents DECIMAL, NUMERIC and MONEY values as their formatted string representation, and encodes them as shown in the following table.
+
+
+
+
+ PostgreSQL type
+ CDC type
+
+
+
+
+
+ NUMERIC[(M[,D])]
+ STRING
+
+
+
+ DECIMAL[(M[,D])]
+ STRING
+
+
+
+ MONEY[(M[,D])]
+ STRING
+
+
+
+
+
+PostgreSQL supports NaN (not a number) as a special value to be stored in DECIMAL/NUMERIC values when the setting of
debezium.decimal.handling.mode is string or double. In this case, the connector encodes NaN as either Double.NaN or the string constant NAN.
+
+### HSTORE type Mapping
+The setting of the PostgreSQL connector configuration property
debezium.hstore.handling.mode determines how the connector maps HSTORE values.
+
+When the
debezium.hstore.handling.mode property is set to json (the default), the connector represents HSTORE values as string representations of JSON values and encodes them as shown in the following table. When the
debezium.hstore.handling.mode property is set to map, the connector uses the MAP schema type for HSTORE values.
+
+
+
+
+ PostgreSQL type
+ CDC type
+
+
+
+
+
+ HSTORE
+ STRING(`debezium.hstore.handling.mode`=`string`)
+
+
+
+ HSTORE
+ MAP(`debezium.hstore.handling.mode`=`map`)
+
+
+
+
+
+### Network address types Mapping
+PostgreSQL has data types that can store IPv4, IPv6, and MAC addresses. It is better to use these types instead of plain text types to store network addresses. Network address types offer input error checking and specialized operators and functions.
+
+
+
+
+ PostgreSQL type
+ CDC type
+
+
+
+
+
+ INET
+ STRING
+
+
+
+ CIDR
+ STRING
+
+
+
+ MACADDR
+ STRING
+
+
+
+ MACADDR8
+ STRING
+
+
+
+
+
+### PostGIS Types Mapping
PostgreSQL supports spatial data types through the PostGIS extension:
```
GEOMETRY(POINT, xx): Represents a point in a Cartesian coordinate system, with EPSG:xx defining the coordinate system. It is suitable for local planar calculations.
@@ -392,11 +617,11 @@ The former is used for small-area planar data, while the latter is used for larg
GEOMETRY(POINT, xx)
- {"hexewkb":"0101000020730c00001c7c613255de6540787aa52c435c42c0","srid":3187}
+ {"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}"
GEOGRAPHY(MULTILINESTRING)
- {"hexewkb":"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0","srid":4326}
+ {"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
index 560ef72d26d..a37c35c5216 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
@@ -30,11 +30,14 @@
import io.debezium.data.Envelope;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
-import io.debezium.util.HexConverter;
+import io.debezium.data.geometry.Point;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.io.WKBReader;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -48,8 +51,6 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem
private static final long serialVersionUID = 1L;
private List
readableMetadataList;
- public static final String SRID = "srid";
- public static final String HEXEWKB = "hexewkb";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
@@ -111,15 +112,37 @@ protected Map getMetadata(SourceRecord record) {
protected Object convertToString(Object dbzObj, Schema schema) {
// the Geometry datatype in PostgreSQL will be converted to
// a String with Json format
- if (Geometry.LOGICAL_NAME.equals(schema.name())
+ if (Point.LOGICAL_NAME.equals(schema.name())
+ || Geometry.LOGICAL_NAME.equals(schema.name())
|| Geography.LOGICAL_NAME.equals(schema.name())) {
try {
Struct geometryStruct = (Struct) dbzObj;
byte[] wkb = geometryStruct.getBytes("wkb");
- Optional srid = Optional.ofNullable(geometryStruct.getInt32(SRID));
- Map geometryInfo = new HashMap<>(2);
- geometryInfo.put(HEXEWKB, HexConverter.convertToHexString(wkb));
- geometryInfo.put(SRID, srid.orElse(0));
+
+ WKBReader wkbReader = new WKBReader();
+ org.locationtech.jts.geom.Geometry jtsGeom = wkbReader.read(wkb);
+
+ Optional srid = Optional.ofNullable(geometryStruct.getInt32("srid"));
+ Map geometryInfo = new HashMap<>();
+ String geometryType = jtsGeom.getGeometryType();
+ geometryInfo.put("type", geometryType);
+
+ if (geometryType.equals("GeometryCollection")) {
+ geometryInfo.put("geometries", jtsGeom.toText());
+ } else {
+ Coordinate[] coordinates = jtsGeom.getCoordinates();
+ List coordinateList = new ArrayList<>();
+ if (coordinates != null) {
+ for (Coordinate coordinate : coordinates) {
+ coordinateList.add(new double[] {coordinate.x, coordinate.y});
+ geometryInfo.put(
+ "coordinates", new double[] {coordinate.x, coordinate.y});
+ }
+ }
+ geometryInfo.put(
+ "coordinates", OBJECT_MAPPER.writeValueAsString(coordinateList));
+ }
+ geometryInfo.put("srid", srid.orElse(0));
return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo));
} catch (Exception e) {
throw new IllegalArgumentException(
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java
index e82e5e8efa7..90a66978029 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java
@@ -24,6 +24,7 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
+import io.debezium.data.geometry.Point;
import org.apache.kafka.connect.data.Schema;
/** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */
@@ -35,7 +36,8 @@ public class PostgresSchemaDataTypeInference extends DebeziumSchemaDataTypeInfer
protected DataType inferStruct(Object value, Schema schema) {
// the Geometry datatype in PostgresSQL will be converted to
// a String with Json format
- if (Geography.LOGICAL_NAME.equals(schema.name())
+ if (Point.LOGICAL_NAME.equals(schema.name())
+ || Geography.LOGICAL_NAME.equals(schema.name())
|| Geometry.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.STRING();
} else {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
index d2200531c49..0f3b0580644 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
@@ -23,9 +23,11 @@
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTopicSelector;
+import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
@@ -165,16 +167,18 @@ public static Schema getTableSchema(
topicSelector,
valueConverterBuilder.build(jdbc.getTypeRegistry()));
Table tableSchema = postgresSchema.tableFor(tableId);
- return toSchema(tableSchema);
+ return toSchema(
+ tableSchema, sourceConfig.getDbzConnectorConfig(), jdbc.getTypeRegistry());
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize PostgresReplicationConnection", e);
}
}
- public static Schema toSchema(Table table) {
+ public static Schema toSchema(
+ Table table, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) {
List columns =
table.columns().stream()
- .map(PostgresSchemaUtils::toColumn)
+ .map(column -> toColumn(column, dbzConfig, typeRegistry))
.collect(Collectors.toList());
return Schema.newBuilder()
@@ -184,16 +188,21 @@ public static Schema toSchema(Table table) {
.build();
}
- public static Column toColumn(io.debezium.relational.Column column) {
+ public static Column toColumn(
+ io.debezium.relational.Column column,
+ PostgresConnectorConfig dbzConfig,
+ TypeRegistry typeRegistry) {
if (column.defaultValueExpression().isPresent()) {
return Column.physicalColumn(
column.name(),
- PostgresTypeUtils.fromDbzColumn(column),
+ PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry),
column.comment(),
column.defaultValueExpression().get());
} else {
return Column.physicalColumn(
- column.name(), PostgresTypeUtils.fromDbzColumn(column), column.comment());
+ column.name(),
+ PostgresTypeUtils.fromDbzColumn(column, dbzConfig, typeRegistry),
+ column.comment());
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
index 8af4057f9f0..0465563fb80 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
@@ -22,52 +22,24 @@
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.table.types.logical.DecimalType;
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.connector.postgresql.PgOid;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresType;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
+import static io.debezium.connector.postgresql.PostgresConnectorConfig.MONEY_FRACTION_DIGITS;
+
/** A utility class for converting Postgres types to Flink types. */
public class PostgresTypeUtils {
- private static final String PG_SMALLSERIAL = "smallserial";
- private static final String PG_SERIAL = "serial";
- private static final String PG_BIGSERIAL = "bigserial";
- private static final String PG_BYTEA = "bytea";
- private static final String PG_BYTEA_ARRAY = "_bytea";
- private static final String PG_SMALLINT = "int2";
- private static final String PG_SMALLINT_ARRAY = "_int2";
- private static final String PG_INTEGER = "int4";
- private static final String PG_INTEGER_ARRAY = "_int4";
- private static final String PG_BIGINT = "int8";
- private static final String PG_BIGINT_ARRAY = "_int8";
- private static final String PG_REAL = "float4";
- private static final String PG_REAL_ARRAY = "_float4";
- private static final String PG_DOUBLE_PRECISION = "float8";
- private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
- private static final String PG_NUMERIC = "numeric";
- private static final String PG_NUMERIC_ARRAY = "_numeric";
- private static final String PG_BOOLEAN = "bool";
- private static final String PG_BOOLEAN_ARRAY = "_bool";
- private static final String PG_TIMESTAMP = "timestamp";
- private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
- private static final String PG_TIMESTAMPTZ = "timestamptz";
- private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
- private static final String PG_DATE = "date";
- private static final String PG_DATE_ARRAY = "_date";
- private static final String PG_TIME = "time";
- private static final String PG_TIME_ARRAY = "_time";
- private static final String PG_TEXT = "text";
- private static final String PG_TEXT_ARRAY = "_text";
- private static final String PG_CHAR = "bpchar";
- private static final String PG_CHAR_ARRAY = "_bpchar";
- private static final String PG_CHARACTER = "character";
- private static final String PG_CHARACTER_ARRAY = "_character";
- private static final String PG_CHARACTER_VARYING = "varchar";
- private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
- private static final String PG_UUID = "uuid";
- private static final String PG_GEOMETRY = "geometry";
- private static final String PG_GEOGRAPHY = "geography";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
- public static DataType fromDbzColumn(Column column) {
- DataType dataType = convertFromColumn(column);
+ public static DataType fromDbzColumn(
+ Column column, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) {
+ DataType dataType = convertFromColumn(column, dbzConfig, typeRegistry);
if (column.isOptional()) {
return dataType;
} else {
@@ -79,92 +51,258 @@ public static DataType fromDbzColumn(Column column) {
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
- private static DataType convertFromColumn(Column column) {
- String typeName = column.typeName();
+ private static DataType convertFromColumn(
+ Column column, PostgresConnectorConfig dbzConfig, TypeRegistry typeRegistry) {
+ int nativeType = column.nativeType();
int precision = column.length();
int scale = column.scale().orElse(0);
- switch (typeName) {
- case PG_BOOLEAN:
+ PostgresConnectorConfig.IntervalHandlingMode intervalHandlingMode =
+ PostgresConnectorConfig.IntervalHandlingMode.parse(
+ dbzConfig
+ .getConfig()
+ .getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE));
+
+ PostgresConnectorConfig.BinaryHandlingMode binaryHandlingMode =
+ dbzConfig.binaryHandlingMode();
+
+ TemporalPrecisionMode temporalPrecisionMode = dbzConfig.getTemporalPrecisionMode();
+
+ JdbcValueConverters.DecimalMode decimalMode =
+ dbzConfig.getDecimalMode() != null
+ ? dbzConfig.getDecimalMode()
+ : JdbcValueConverters.DecimalMode.PRECISE;
+
+ PostgresConnectorConfig.HStoreHandlingMode hStoreHandlingMode =
+ PostgresConnectorConfig.HStoreHandlingMode.parse(
+ dbzConfig
+ .getConfig()
+ .getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE));
+
+ switch (nativeType) {
+ case PgOid.BOOL:
return DataTypes.BOOLEAN();
- case PG_BOOLEAN_ARRAY:
+ case PgOid.BIT:
+ case PgOid.VARBIT:
+ if (precision == 1) {
+ return DataTypes.BOOLEAN();
+ } else {
+ return DataTypes.BINARY(precision);
+ }
+ case PgOid.BOOL_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
- case PG_BYTEA:
- return DataTypes.BYTES();
- case PG_BYTEA_ARRAY:
- return DataTypes.ARRAY(DataTypes.BYTES());
- case PG_SMALLINT:
- case PG_SMALLSERIAL:
+ case PgOid.BYTEA:
+ return handleBinaryWithBinaryMode(binaryHandlingMode);
+ case PgOid.BYTEA_ARRAY:
+ return DataTypes.ARRAY(handleBinaryWithBinaryMode(binaryHandlingMode));
+ case PgOid.INT2:
return DataTypes.SMALLINT();
- case PG_SMALLINT_ARRAY:
+ case PgOid.INT2_ARRAY:
return DataTypes.ARRAY(DataTypes.SMALLINT());
- case PG_INTEGER:
- case PG_SERIAL:
+ case PgOid.INT4:
return DataTypes.INT();
- case PG_INTEGER_ARRAY:
+ case PgOid.INT4_ARRAY:
return DataTypes.ARRAY(DataTypes.INT());
- case PG_BIGINT:
- case PG_BIGSERIAL:
+ case PgOid.INT8:
+ case PgOid.OID:
return DataTypes.BIGINT();
- case PG_BIGINT_ARRAY:
+ case PgOid.INTERVAL:
+ return handleIntervalWithIntervalHandlingMode(intervalHandlingMode);
+ case PgOid.INTERVAL_ARRAY:
+ return DataTypes.ARRAY(
+ handleIntervalWithIntervalHandlingMode(intervalHandlingMode));
+ case PgOid.INT8_ARRAY:
return DataTypes.ARRAY(DataTypes.BIGINT());
- case PG_REAL:
+ case PgOid.FLOAT4:
return DataTypes.FLOAT();
- case PG_REAL_ARRAY:
+ case PgOid.FLOAT4_ARRAY:
return DataTypes.ARRAY(DataTypes.FLOAT());
- case PG_DOUBLE_PRECISION:
+ case PgOid.FLOAT8:
return DataTypes.DOUBLE();
- case PG_DOUBLE_PRECISION_ARRAY:
+ case PgOid.FLOAT8_ARRAY:
return DataTypes.ARRAY(DataTypes.DOUBLE());
- case PG_NUMERIC:
+ case PgOid.NUMERIC:
// see SPARK-26538: handle numeric without explicit precision and scale.
- if (precision > 0) {
- return DataTypes.DECIMAL(precision, scale);
- }
- return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0);
- case PG_NUMERIC_ARRAY:
+ return handleNumericWithDecimalMode(precision, scale, decimalMode);
+ case PgOid.NUMERIC_ARRAY:
// see SPARK-26538: handle numeric without explicit precision and scale.
- if (precision > 0) {
- return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale));
- }
- return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0));
- case PG_CHAR:
- case PG_CHARACTER:
+ return DataTypes.ARRAY(handleNumericWithDecimalMode(precision, scale, decimalMode));
+ case PgOid.MONEY:
+ return handleMoneyWithDecimalMode(
+ dbzConfig.getConfig().getInteger(MONEY_FRACTION_DIGITS), decimalMode);
+ case PgOid.CHAR:
+ case PgOid.BPCHAR:
return DataTypes.CHAR(precision);
- case PG_CHAR_ARRAY:
- case PG_CHARACTER_ARRAY:
+ case PgOid.CHAR_ARRAY:
+ case PgOid.BPCHAR_ARRAY:
return DataTypes.ARRAY(DataTypes.CHAR(precision));
- case PG_CHARACTER_VARYING:
+ case PgOid.VARCHAR:
return DataTypes.VARCHAR(precision);
- case PG_CHARACTER_VARYING_ARRAY:
+ case PgOid.VARCHAR_ARRAY:
return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
- case PG_TEXT:
- case PG_GEOMETRY:
- case PG_GEOGRAPHY:
- case PG_UUID:
+ case PgOid.TEXT:
+ case PgOid.POINT:
+ case PgOid.UUID:
+ case PgOid.JSON:
+ case PgOid.JSONB:
+ case PgOid.XML:
+ case PgOid.INET_OID:
+ case PgOid.CIDR_OID:
+ case PgOid.MACADDR_OID:
+ case PgOid.MACADDR8_OID:
+ case PgOid.INT4RANGE_OID:
+ case PgOid.NUM_RANGE_OID:
+ case PgOid.INT8RANGE_OID:
+ case PgOid.TSRANGE_OID:
+ case PgOid.TSTZRANGE_OID:
+ case PgOid.DATERANGE_OID:
return DataTypes.STRING();
- case PG_TEXT_ARRAY:
+ case PgOid.TEXT_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
- case PG_TIMESTAMP:
- return DataTypes.TIMESTAMP(scale);
- case PG_TIMESTAMP_ARRAY:
- return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
- case PG_TIMESTAMPTZ:
+ case PgOid.TIMESTAMP:
+ return handleTimestampWithTemporalMode(temporalPrecisionMode, scale);
+ case PgOid.TIMESTAMP_ARRAY:
+ return DataTypes.ARRAY(
+ handleTimestampWithTemporalMode(temporalPrecisionMode, scale));
+ case PgOid.TIMESTAMPTZ:
return new ZonedTimestampType(scale);
- case PG_TIMESTAMPTZ_ARRAY:
+ case PgOid.TIMESTAMPTZ_ARRAY:
return DataTypes.ARRAY(new ZonedTimestampType(scale));
- case PG_TIME:
- return DataTypes.TIME(scale);
- case PG_TIME_ARRAY:
- return DataTypes.ARRAY(DataTypes.TIME(scale));
- case PG_DATE:
- return DataTypes.DATE();
- case PG_DATE_ARRAY:
- return DataTypes.ARRAY(DataTypes.DATE());
+ case PgOid.TIME:
+ return handleTimeWithTemporalMode(temporalPrecisionMode, scale);
+ case PgOid.TIME_ARRAY:
+ return DataTypes.ARRAY(handleTimeWithTemporalMode(temporalPrecisionMode, scale));
+ case PgOid.DATE:
+ return handleDateWithTemporalMode(temporalPrecisionMode);
+ case PgOid.DATE_ARRAY:
+ return DataTypes.ARRAY(handleDateWithTemporalMode(temporalPrecisionMode));
default:
+ if (nativeType == typeRegistry.ltreeOid()) {
+ return DataTypes.STRING();
+ } else if (nativeType == typeRegistry.geometryOid()) {
+ return DataTypes.STRING();
+ } else if (nativeType == typeRegistry.geographyOid()) {
+ return DataTypes.STRING();
+ } else if (nativeType == typeRegistry.citextOid()) {
+ return DataTypes.STRING();
+ } else if (nativeType == typeRegistry.hstoreOid()) {
+ return handleHstoreWithHstoreMode(hStoreHandlingMode);
+ } else if (nativeType == typeRegistry.ltreeArrayOid()) {
+ return DataTypes.ARRAY(DataTypes.STRING());
+ } else if (nativeType == typeRegistry.geometryArrayOid()) {
+ return DataTypes.ARRAY(DataTypes.STRING());
+ }
+ final PostgresType resolvedType = typeRegistry.get(nativeType);
+ if (resolvedType.isEnumType()) {
+ return DataTypes.STRING();
+ }
throw new UnsupportedOperationException(
- String.format("Doesn't support Postgres type '%s' yet", typeName));
+ String.format(
+ "Doesn't support Postgres type '%s', Postgres oid '%d' yet",
+ column.typeName(), column.nativeType()));
+ }
+ }
+
+ public static DataType handleNumericWithDecimalMode(
+ int precision, int scale, JdbcValueConverters.DecimalMode mode) {
+ switch (mode) {
+ case PRECISE:
+ if (precision > DecimalType.DEFAULT_SCALE
+ && precision <= DecimalType.MAX_PRECISION) {
+ return DataTypes.DECIMAL(precision, scale);
+ }
+ return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, DecimalType.DEFAULT_SCALE);
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case STRING:
+ return DataTypes.STRING();
+ default:
+ throw new IllegalArgumentException("Unknown decimal mode: " + mode);
+ }
+ }
+
+ public static DataType handleBinaryWithBinaryMode(
+ CommonConnectorConfig.BinaryHandlingMode mode) {
+ switch (mode) {
+ case BYTES:
+ return DataTypes.BYTES();
+ case BASE64:
+ case HEX:
+ return DataTypes.STRING();
+ default:
+ throw new IllegalArgumentException("Unknown binary mode: " + mode);
+ }
+ }
+
+ public static DataType handleMoneyWithDecimalMode(
+ int moneyFractionDigits, JdbcValueConverters.DecimalMode mode) {
+ switch (mode) {
+ case PRECISE:
+ return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, moneyFractionDigits);
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case STRING:
+ return DataTypes.STRING();
+ default:
+ throw new IllegalArgumentException("Unknown decimal mode: " + mode);
+ }
+ }
+
+ public static DataType handleIntervalWithIntervalHandlingMode(
+ PostgresConnectorConfig.IntervalHandlingMode mode) {
+ switch (mode) {
+ case NUMERIC:
+ return DataTypes.BIGINT();
+ case STRING:
+ return DataTypes.STRING();
+ default:
+ throw new IllegalArgumentException("Unknown interval mode: " + mode);
+ }
+ }
+
+ public static DataType handleDateWithTemporalMode(TemporalPrecisionMode mode) {
+ switch (mode) {
+ case ADAPTIVE:
+ case ADAPTIVE_TIME_MICROSECONDS:
+ case CONNECT:
+ return DataTypes.DATE();
+ default:
+ throw new IllegalArgumentException("Unknown temporal precision mode: " + mode);
+ }
+ }
+
+ public static DataType handleTimeWithTemporalMode(TemporalPrecisionMode mode, int scale) {
+ switch (mode) {
+ case ADAPTIVE:
+ case ADAPTIVE_TIME_MICROSECONDS:
+ case CONNECT:
+ return DataTypes.INT();
+ default:
+ throw new IllegalArgumentException("Unknown temporal precision mode: " + mode);
+ }
+ }
+
+ public static DataType handleTimestampWithTemporalMode(TemporalPrecisionMode mode, int scale) {
+ switch (mode) {
+ case ADAPTIVE:
+ case ADAPTIVE_TIME_MICROSECONDS:
+ case CONNECT:
+ return DataTypes.TIMESTAMP(scale);
+ default:
+ throw new IllegalArgumentException("Unknown temporal precision mode: " + mode);
+ }
+ }
+
+ public static DataType handleHstoreWithHstoreMode(
+ PostgresConnectorConfig.HStoreHandlingMode mode) {
+ switch (mode) {
+ case JSON:
+ return DataTypes.STRING();
+ case MAP:
+ return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
+ default:
+ throw new IllegalArgumentException("Unknown hstore mode: " + mode);
}
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index 3dc04f59dc5..9da05181f8d 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -21,8 +21,10 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -59,10 +61,16 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.stream.Stream;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
@@ -193,15 +201,506 @@ public void testFullTypes() throws Exception {
64822000,
DecimalData.fromBigDecimal(new BigDecimal("500"), 10, 0),
BinaryStringData.fromString(
- "{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187}"),
+ "{\"coordinates\":\"[[174.9479,-36.7208]]\",\"type\":\"Point\",\"srid\":3187}"),
BinaryStringData.fromString(
- "{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326}")
+ "{\"coordinates\":\"[[169.1321,-44.7032],[167.8974,-44.6414]]\",\"type\":\"MultiLineString\",\"srid\":4326}"),
+ true,
+ new byte[] {10},
+ new byte[] {42},
+ BinaryStringData.fromString("abc"),
+ 1209600000000L,
+ BinaryStringData.fromString(
+ "{\"order_id\": 10248, \"product\": \"Notebook\", \"quantity\": 5}"),
+ BinaryStringData.fromString(
+ "{\"product\": \"Pen\", \"order_id\": 10249, \"quantity\": 10}"),
+ BinaryStringData.fromString(
+ "\n"
+ + "123 \n"
+ + "Alice \n"
+ + "alice@example.com \n"
+ + "\n"
+ + "dark \n"
+ + "true \n"
+ + " \n"
+ + " "),
+ BinaryStringData.fromString(
+ "{\"coordinates\":\"[[3.456,7.89]]\",\"type\":\"Point\",\"srid\":0}"),
+ BinaryStringData.fromString("foo.bar.baz"),
+ BinaryStringData.fromString("JohnDoe"),
+ BinaryStringData.fromString("{\"size\":\"L\",\"color\":\"blue\"}"),
+ BinaryStringData.fromString("192.168.1.1"),
+ BinaryStringData.fromString("[1,10)"),
+ BinaryStringData.fromString("[1000000000,5000000000)"),
+ BinaryStringData.fromString("[5.5,20.75)"),
+ BinaryStringData.fromString(
+ "[\"2023-08-01 08:00:00\",\"2023-08-01 12:00:00\")"),
+ BinaryStringData.fromString("[2023-08-01,2023-08-15)"),
+ BinaryStringData.fromString("pending"),
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+
+ Assertions.assertThat(recordFields(snapshotRecord, COMMON_TYPES))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("time.precision.mode", "adaptive");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("inventory.time_types")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 2,
+ 18460,
+ 64822000,
+ 64822123,
+ 64822123,
+ TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+ TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2020-07-17T18:00:22.123456")),
+ TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testHandlingDecimalModePrecise() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("decimal.handling.mode", "precise");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("test_decimal.decimal_test_table")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 1,
+ DecimalData.fromBigDecimal(new BigDecimal("123.45"), 10, 2),
+ DecimalData.fromBigDecimal(new BigDecimal("67.8912"), 8, 4),
+ DecimalData.fromBigDecimal(new BigDecimal("987.65"), 5, 2),
+ DecimalData.fromBigDecimal(new BigDecimal("12.3"), 3, 1),
+ DecimalData.fromBigDecimal(new BigDecimal("100.50"), 38, 2),
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+
+ Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_PRECISE))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testHandlingDecimalModeDouble() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("decimal.handling.mode", "double");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("test_decimal.decimal_test_table")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 1, 123.45, 67.8912, 987.65, 12.3, 100.50,
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+
+ Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_DOUBLE))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testHandlingDecimalModeString() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("decimal.handling.mode", "string");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("test_decimal.decimal_test_table")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 1,
+ BinaryStringData.fromString("123.45"),
+ BinaryStringData.fromString("67.8912"),
+ BinaryStringData.fromString("987.65"),
+ BinaryStringData.fromString("12.3"),
+ BinaryStringData.fromString("100.5"),
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_STRING))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testZeroHandlingDecimalModePrecise() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("decimal.handling.mode", "precise");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("test_decimal.decimal_test_zero")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 2,
+ DecimalData.fromBigDecimal(new BigDecimal("99999999.99"), 10, 2),
+ DecimalData.fromBigDecimal(new BigDecimal("9999.9999"), 8, 4),
+ null,
+ null,
+ null,
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_PRECISE))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testZeroHandlingDecimalModeDouble() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("decimal.handling.mode", "double");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("test_decimal.decimal_test_zero")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 2, 99999999.99, 9999.9999, null, null, null,
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_DOUBLE))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testZeroHandlingDecimalModeString() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("decimal.handling.mode", "string");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("test_decimal.decimal_test_zero")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 2,
+ BinaryStringData.fromString("99999999.99"),
+ BinaryStringData.fromString("9999.9999"),
+ null,
+ null,
+ null,
};
List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord, TYPES_WITH_STRING))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testHstoreHandlingModeMap() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("hstore.handling.mode", "map");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("inventory.hstore_types")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+ Map expectedMap = new HashMap<>();
+ expectedMap.put(BinaryStringData.fromString("a"), BinaryStringData.fromString("1"));
+ expectedMap.put(BinaryStringData.fromString("b"), BinaryStringData.fromString("2"));
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+ Object[] snapshotObjects = recordFields(snapshotRecord, HSTORE_TYPES_WITH_ADAPTIVE);
+ Map snapshotMap =
+ (Map)
+ ((BinaryMapData) snapshotObjects[1])
+ .toJavaMap(DataTypes.STRING(), DataTypes.STRING());
+ Assertions.assertThat(expectedMap).isEqualTo(snapshotMap);
+ }
+
+ @Test
+ public void testJsonTypes() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("hstore.handling.mode", "map");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+ .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("inventory.json_types")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
- Assertions.assertThat(recordFields(snapshotRecord, PG_TYPES)).isEqualTo(expectedSnapshot);
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 1,
+ BinaryStringData.fromString("{\"key1\":\"value1\"}"),
+ BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"),
+ BinaryStringData.fromString(
+ "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
+ BinaryStringData.fromBytes("{\"key1\": \"value1\"}".getBytes()),
+ BinaryStringData.fromBytes(
+ "{\"key1\": \"value1\", \"key2\": \"value2\"}".getBytes()),
+ BinaryStringData.fromBytes(
+ "[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"
+ .getBytes()),
+ 1L
+ };
+
+ List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord, JSON_TYPES)).isEqualTo(expectedSnapshot);
}
private Tuple2, List> fetchResultsAndCreateTableEvent(
@@ -237,7 +736,11 @@ private Object[] recordFields(RecordData record, RowType rowType) {
return fields;
}
- private static final RowType PG_TYPES =
+ private Instant toInstant(String ts) {
+ return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC+8")).toInstant();
+ }
+
+ private static final RowType COMMON_TYPES =
RowType.of(
DataTypes.INT(),
DataTypes.BYTES(),
@@ -259,5 +762,80 @@ private Object[] recordFields(RecordData record, RowType rowType) {
DataTypes.TIME(0),
DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE),
DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.BOOLEAN(),
+ DataTypes.BINARY(8),
+ DataTypes.BINARY(20),
+ DataTypes.CHAR(3),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
DataTypes.STRING());
+
+ private static final RowType TYPES_WITH_PRECISE =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.DECIMAL(10, 2),
+ DataTypes.DECIMAL(8, 4),
+ DataTypes.DECIMAL(5, 2),
+ DataTypes.DECIMAL(3, 1),
+ DataTypes.DECIMAL(38, 2));
+
+ private static final RowType TYPES_WITH_DOUBLE =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DOUBLE(),
+ DataTypes.DOUBLE(),
+ DataTypes.DOUBLE(),
+ DataTypes.DOUBLE());
+
+ private static final RowType TYPES_WITH_STRING =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING());
+
+ private static final RowType TIME_TYPES_WITH_ADAPTIVE =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.DATE(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.TIMESTAMP(0),
+ DataTypes.TIMESTAMP(3),
+ DataTypes.TIMESTAMP(6),
+ DataTypes.TIMESTAMP(),
+ DataTypes.TIMESTAMP_LTZ(0));
+
+ private static final RowType HSTORE_TYPES_WITH_ADAPTIVE =
+ RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
+
+ private static final RowType JSON_TYPES =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.BIGINT());
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
index 4790b0f4918..9ac9d95bbb2 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
@@ -21,7 +21,11 @@ DROP SCHEMA IF EXISTS inventory CASCADE;
CREATE SCHEMA inventory;
-- postgis is installed into public schema
SET search_path TO inventory, public;
+CREATE EXTENSION IF NOT EXISTS ltree;
+CREATE EXTENSION IF NOT EXISTS citext;
+CREATE EXTENSION IF NOT EXISTS hstore;
+CREATE TYPE status AS ENUM ('pending', 'approved', 'rejected');
CREATE TABLE full_types
(
@@ -46,6 +50,25 @@ CREATE TABLE full_types
default_numeric_c NUMERIC,
geometry_c GEOMETRY(POINT, 3187),
geography_c GEOGRAPHY(MULTILINESTRING),
+ bit_c BIT(1),
+ bit_fixed_c BIT(8),
+ bit_varying_c BIT VARYING(20),
+ bpchar_c BPCHAR(3),
+ duration_c INTERVAL,
+ json_c JSON,
+ jsonb_c JSONB,
+ xml_C XML,
+ location POINT,
+ ltree_c LTREE,
+ username CITEXT NOT NULL,
+ attributes HSTORE,
+ inet_c INET,
+ int4range_c INT4RANGE,
+ int8range_c INT8RANGE,
+ numrange_c NUMRANGE,
+ tsrange_c TSRANGE,
+ daterange_c DATERANGE,
+ status status NOT NULL,
PRIMARY KEY (id)
);
@@ -56,4 +79,78 @@ INSERT INTO inventory.full_types
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
'2020-07-17', '18:00:22', 500,'SRID=3187;POINT(174.9479 -36.7208)'::geometry,
- 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
\ No newline at end of file
+ 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography,B'1',B'00001010',B'00101010','abc','2 weeks','{"order_id": 10248, "product": "Notebook", "quantity": 5}','{"order_id": 10249, "product": "Pen", "quantity": 10}'::jsonb,'
+ 123
+ Alice
+ alice@example.com
+
+ dark
+ true
+
+ ','(3.456,7.890)'::point,'foo.bar.baz','JohnDoe','color => "blue", size => "L"','192.168.1.1'::inet,'[1, 10)'::int4range,'[1000000000, 5000000000)'::int8range,'[5.5, 20.75)'::numrange,
+ '["2023-08-01 08:00:00", "2023-08-01 12:00:00")','["2023-08-01", "2023-08-15")','pending');
+
+
+CREATE TABLE time_types (
+ id SERIAL PRIMARY KEY,
+ date_c DATE,
+ time_c TIME(0) WITHOUT TIME ZONE,
+ time_3_c TIME(3) WITHOUT TIME ZONE,
+ time_6_c TIME(6) WITHOUT TIME ZONE,
+ datetime_c TIMESTAMP(0) WITHOUT TIME ZONE,
+ datetime3_c TIMESTAMP(3) WITHOUT TIME ZONE,
+ datetime6_c TIMESTAMP(6) WITHOUT TIME ZONE,
+ timestamp_c TIMESTAMP WITHOUT TIME ZONE,
+ timestamp_tz_c TIMESTAMP WITH TIME ZONE
+);
+ALTER TABLE inventory.time_types
+ REPLICA IDENTITY FULL;
+
+INSERT INTO time_types
+VALUES (2,
+ '2020-07-17',
+ '18:00:22',
+ '18:00:22.123',
+ '18:00:22.123456',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00:22.123',
+ '2020-07-17 18:00:22.123456',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00:22+08:00');
+
+CREATE TABLE hstore_types (
+ id SERIAL PRIMARY KEY,
+ hstore_c HSTORE
+);
+
+ALTER TABLE inventory.hstore_types
+ REPLICA IDENTITY FULL;
+
+INSERT INTO hstore_types
+VALUES (1, 'a => 1, b => 2');
+
+CREATE TABLE json_types (
+ id SERIAL PRIMARY KEY,
+ json_c0 JSON,
+ json_c1 JSON,
+ json_c2 JSON,
+ jsonb_c0 JSONB,
+ jsonb_c1 JSONB,
+ jsonb_c2 JSONB,
+ int_c INTEGER
+);
+
+ALTER TABLE inventory.json_types
+ REPLICA IDENTITY FULL;
+
+INSERT INTO json_types (id,json_c0, json_c1, json_c2, jsonb_c0, jsonb_c1, jsonb_c2, int_c)
+VALUES
+ (1,
+ '{"key1":"value1"}',
+ '{"key1":"value1","key2":"value2"}',
+ '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
+ '{"key1":"value1"}'::jsonb,
+ '{"key1":"value1","key2":"value2"}'::jsonb,
+ '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]'::jsonb,
+ 1
+ );
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql
new file mode 100644
index 00000000000..b2f18d5899f
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/decimal_mode_test.sql
@@ -0,0 +1,70 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: decimal_mode_test
+-- ----------------------------------------------------------------------------------------------------------------
+-- Generate a number of tables to cover as many of the PG types as possible
+
+
+DROP SCHEMA IF EXISTS test_decimal CASCADE;
+CREATE SCHEMA IF NOT EXISTS test_decimal;
+
+SET search_path TO test_decimal;
+
+DROP TABLE IF EXISTS decimal_test_table;
+CREATE TABLE decimal_test_table (
+ id SERIAL PRIMARY KEY,
+ fixed_numeric NUMERIC(10,2),
+ fixed_decimal DECIMAL(8,4),
+ variable_numeric NUMERIC,
+ variable_decimal DECIMAL,
+ amount_money MONEY
+);
+
+ALTER TABLE decimal_test_table REPLICA IDENTITY FULL;
+
+INSERT INTO decimal_test_table (
+ id,
+ fixed_numeric,
+ fixed_decimal,
+ variable_numeric,
+ variable_decimal,
+ amount_money
+) VALUES
+(1, 123.45, 67.8912, 987.65, 12.3, '100.50'::money);
+
+
+DROP TABLE IF EXISTS decimal_test_zero;
+CREATE TABLE decimal_test_zero (
+ id SERIAL PRIMARY KEY,
+ fixed_numeric NUMERIC(10,2),
+ fixed_decimal DECIMAL(8,4),
+ variable_numeric NUMERIC,
+ variable_decimal DECIMAL,
+ amount_money MONEY
+);
+
+ALTER TABLE decimal_test_zero REPLICA IDENTITY FULL;
+
+INSERT INTO decimal_test_zero (
+ id,
+ fixed_numeric,
+ fixed_decimal,
+ variable_numeric,
+ variable_decimal,
+ amount_money
+) VALUES
+ (2, 99999999.99, 9999.9999, null, null, null);
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index 338f03d2356..538e8ceef24 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
@@ -31,6 +32,7 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
@@ -222,8 +224,17 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
return convertToRecord((RowType) type, dbzObj, schema);
}
};
- case ARRAY:
case MAP:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ return convertToMap(dbzObj, schema);
+ }
+ };
+ case ARRAY:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -426,6 +437,41 @@ private static Object convertField(
}
}
+ protected Object convertToMap(Object dbzObj, Schema schema) throws Exception {
+ if (dbzObj == null) {
+ return null;
+ }
+
+ // Obtain the schema for the keys and values of a Map"
+ Schema keySchema = schema.keySchema();
+ Schema valueSchema = schema.valueSchema();
+
+ // Infer the data types of keys and values
+ DataType keyType =
+ keySchema != null
+ ? schemaDataTypeInference.infer(null, keySchema)
+ : DataTypes.STRING();
+
+ DataType valueType =
+ valueSchema != null
+ ? schemaDataTypeInference.infer(null, valueSchema)
+ : DataTypes.STRING();
+
+ DeserializationRuntimeConverter keyConverter = createConverter(keyType);
+ DeserializationRuntimeConverter valueConverter = createConverter(valueType);
+
+ Map, ?> map = (Map, ?>) dbzObj;
+ Map convertedMap = new java.util.HashMap<>(map.size());
+
+ for (Map.Entry, ?> entry : map.entrySet()) {
+ Object convertedKey = convertField(keyConverter, entry.getKey(), keySchema);
+ Object convertedValue = convertField(valueConverter, entry.getValue(), valueSchema);
+ convertedMap.put(convertedKey, convertedValue);
+ }
+
+ return new GenericMapData(convertedMap);
+ }
+
private static DeserializationRuntimeConverter wrapIntoNullableConverter(
DeserializationRuntimeConverter converter) {
return new DeserializationRuntimeConverter() {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
index 3afa7128d13..06f4c9d5500 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
@@ -210,6 +210,12 @@ protected DataType inferArray(Object value, Schema schema) {
}
protected DataType inferMap(Object value, Schema schema) {
- throw new UnsupportedOperationException("Unsupported type MAP");
+ Schema keySchema = schema.keySchema();
+ Schema valueSchema = schema.valueSchema();
+
+ DataType keyType = keySchema != null ? infer(null, keySchema) : DataTypes.STRING();
+ DataType valueType = valueSchema != null ? infer(null, valueSchema) : DataTypes.STRING();
+
+ return DataTypes.MAP(keyType, valueType);
}
}