diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 4d5edb66fd3..a5f0924c375 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -393,7 +393,10 @@ source: DOUBLE UNSIGNED ZEROFILL
DOUBLE PRECISION
DOUBLE PRECISION UNSIGNED
- DOUBLE PRECISION UNSIGNED ZEROFILL + DOUBLE PRECISION UNSIGNED ZEROFILL
+ FLOAT(p, s)
+ REAL(p, s)
+ DOUBLE(p, s) DOUBLE diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 36c4cc7708b..3cc5c17a100 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -403,7 +403,10 @@ source: DOUBLE UNSIGNED ZEROFILL
DOUBLE PRECISION
DOUBLE PRECISION UNSIGNED
- DOUBLE PRECISION UNSIGNED ZEROFILL + DOUBLE PRECISION UNSIGNED ZEROFILL
+ FLOAT(p, s)
+ REAL(p, s)
+ DOUBLE(p, s) DOUBLE diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index 0e70ed6d91f..c82525cf60c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -107,6 +107,7 @@ public class MySqlTypeUtils { private static final String MULTIPOLYGON = "MULTIPOLYGON"; private static final String MULTILINESTRING = "MULTILINESTRING"; private static final String UNKNOWN = "UNKNOWN"; + private static final int FLOAT_LENGTH_UNSPECIFIED_FLAG = -1; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ public static DataType fromDbzColumn(Column column) { @@ -164,7 +165,12 @@ private static DataType convertFromColumn(Column column) { case FLOAT: case FLOAT_UNSIGNED: case FLOAT_UNSIGNED_ZEROFILL: - return DataTypes.FLOAT(); + if (column.length() != FLOAT_LENGTH_UNSPECIFIED_FLAG) { + // For FLOAT types with length provided explicitly, treat it like DOUBLE + return DataTypes.DOUBLE(); + } else { + return DataTypes.FLOAT(); + } case REAL: case REAL_UNSIGNED: case REAL_UNSIGNED_ZEROFILL: @@ -236,7 +242,7 @@ private static DataType convertFromColumn(Column column) { return DataTypes.ARRAY(DataTypes.STRING()); default: throw new UnsupportedOperationException( - String.format("Don't support MySQL type '%s' yet.", typeName)); + String.format("MySQL type '%s' is not supported yet.", typeName)); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java index 2bd1ae689c3..dd15fdf6a4b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java @@ -249,7 +249,18 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable { DataTypes.TIMESTAMP_LTZ(0), DataTypes.TIMESTAMP_LTZ(3), DataTypes.TIMESTAMP_LTZ(6), - DataTypes.TIMESTAMP_LTZ(0)); + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE()); Object[] expectedSnapshot = new Object[] { @@ -265,7 +276,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable { TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), - LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")) + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + 2d, + 3d, + 5d, + 7d, + 11d, + 13d, + 17d, + 19d, + 23d, + 29d, + 31d, + 37d }; Object[] expectedStreamRecord = @@ -282,7 +305,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable { TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")), LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), - LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")) + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + 2d, + 3d, + 5d, + 7d, + 11d, + 13d, + 17d, + 19d, + 23d, + 29d, + 31d, + 37d }; database.createAndInitialize(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java index 3d3f0276b69..94f1ed45dc5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java @@ -207,6 +207,156 @@ public void testMysql8AccessTimeTypesSchema() { assertThat(actualSchema).isEqualTo(expectedSchema); } + @Test + public void testMysql57PrecisionTypesSchema() { + fullTypesMySql57Database.createAndInitialize(); + + String[] tables = new String[] {"precision_types"}; + MySqlMetadataAccessor metadataAccessor = + getMetadataAccessor(tables, fullTypesMySql57Database); + + Schema actualSchema = + metadataAccessor.getTableSchema( + TableId.tableId( + fullTypesMySql57Database.getDatabaseName(), "precision_types")); + Schema expectedSchema = + Schema.newBuilder() + .primaryKey("id") + .fromRowDataType( + RowType.of( + new DataType[] { + DataTypes.DECIMAL(20, 0).notNull(), + DataTypes.DECIMAL(6, 2), + DataTypes.DECIMAL(9, 4), + DataTypes.DECIMAL(20, 4), + DataTypes.TIME(0), + DataTypes.TIME(3), + DataTypes.TIME(6), + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(3), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP_LTZ(0), + DataTypes.TIMESTAMP_LTZ(3), + DataTypes.TIMESTAMP_LTZ(6), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE() + }, + new String[] { + "id", + "decimal_c0", + "decimal_c1", + "decimal_c2", + "time_c", + "time_3_c", + "time_6_c", + "datetime_c", + "datetime3_c", + "datetime6_c", + "timestamp_c", + "timestamp3_c", + "timestamp6_c", + "float_c0", + "float_c1", + "float_c2", + "real_c0", + "real_c1", + "real_c2", + "double_c0", + "double_c1", + "double_c2", + "double_precision_c0", + "double_precision_c1", + "double_precision_c2" + })) + .build(); + assertThat(actualSchema).isEqualTo(expectedSchema); + } + + @Test + public void testMysql8PrecisionTypesSchema() { + fullTypesMySql8Database.createAndInitialize(); + + String[] tables = new String[] {"precision_types"}; + MySqlMetadataAccessor metadataAccessor = + getMetadataAccessor(tables, fullTypesMySql8Database); + + Schema actualSchema = + metadataAccessor.getTableSchema( + TableId.tableId( + fullTypesMySql8Database.getDatabaseName(), "precision_types")); + Schema expectedSchema = + Schema.newBuilder() + .primaryKey("id") + .fromRowDataType( + RowType.of( + new DataType[] { + DataTypes.DECIMAL(20, 0).notNull(), + DataTypes.DECIMAL(6, 2), + DataTypes.DECIMAL(9, 4), + DataTypes.DECIMAL(20, 4), + DataTypes.TIME(0), + DataTypes.TIME(3), + DataTypes.TIME(6), + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(3), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP_LTZ(0), + DataTypes.TIMESTAMP_LTZ(3), + DataTypes.TIMESTAMP_LTZ(6), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE(), + DataTypes.DOUBLE() + }, + new String[] { + "id", + "decimal_c0", + "decimal_c1", + "decimal_c2", + "time_c", + "time_3_c", + "time_6_c", + "datetime_c", + "datetime3_c", + "datetime6_c", + "timestamp_c", + "timestamp3_c", + "timestamp6_c", + "float_c0", + "float_c1", + "float_c2", + "real_c0", + "real_c1", + "real_c2", + "double_c0", + "double_c1", + "double_c2", + "double_precision_c0", + "double_precision_c1", + "double_precision_c2" + })) + .build(); + assertThat(actualSchema).isEqualTo(expectedSchema); + } + private void testAccessDatabaseAndTable(UniqueDatabase database) { database.createAndInitialize(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql index 08d25a714db..9699ed908c2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql @@ -140,6 +140,18 @@ CREATE TABLE precision_types timestamp_c TIMESTAMP(0) NULL, timestamp3_c TIMESTAMP(3) NULL, timestamp6_c TIMESTAMP(6) NULL, + float_c0 FLOAT(6, 0), + float_c1 FLOAT(20, 3), + float_c2 FLOAT(24, 12), + real_c0 REAL(6, 0), + real_c1 REAL(20, 3), + real_c2 REAL(24, 12), + double_c0 DOUBLE(6, 0), + double_c1 DOUBLE(20, 3), + double_c2 DOUBLE(24, 12), + double_precision_c0 DOUBLE PRECISION(6, 0), + double_precision_c1 DOUBLE PRECISION(20, 3), + double_precision_c2 DOUBLE PRECISION(24, 12), PRIMARY KEY (id) ) DEFAULT CHARSET=utf8; @@ -156,4 +168,16 @@ VALUES (DEFAULT, '2020-07-17 18:00:22', '2020-07-17 18:00', '2020-07-17 18:00:22', - '2020-07-17 18:00:22'); + '2020-07-17 18:00:22', + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql index 54c6c717053..8abe8868c07 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql @@ -141,9 +141,21 @@ CREATE TABLE precision_types datetime_c DATETIME(0), datetime3_c DATETIME(3), datetime6_c DATETIME(6), - timestamp_c TIMESTAMP(0), - timestamp3_c TIMESTAMP(3), - timestamp6_c TIMESTAMP(6), + timestamp_c TIMESTAMP(0) NULL, + timestamp3_c TIMESTAMP(3) NULL, + timestamp6_c TIMESTAMP(6) NULL, + float_c0 FLOAT(6, 0), + float_c1 FLOAT(20, 3), + float_c2 FLOAT(24, 12), + real_c0 REAL(6, 0), + real_c1 REAL(20, 3), + real_c2 REAL(24, 12), + double_c0 DOUBLE(6, 0), + double_c1 DOUBLE(20, 3), + double_c2 DOUBLE(24, 12), + double_precision_c0 DOUBLE PRECISION(6, 0), + double_precision_c1 DOUBLE PRECISION(20, 3), + double_precision_c2 DOUBLE PRECISION(24, 12), PRIMARY KEY (id) ) DEFAULT CHARSET=utf8; @@ -160,4 +172,16 @@ VALUES (DEFAULT, '2020-07-17 18:00:22', '2020-07-17 18:00', '2020-07-17 18:00:22', - '2020-07-17 18:00:22'); + '2020-07-17 18:00:22', + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37);