Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,10 @@ source:
DOUBLE UNSIGNED ZEROFILL<br>
DOUBLE PRECISION<br>
DOUBLE PRECISION UNSIGNED<br>
DOUBLE PRECISION UNSIGNED ZEROFILL
DOUBLE PRECISION UNSIGNED ZEROFILL<br>
FLOAT(p, s)<br>
REAL(p, s)<br>
DOUBLE(p, s)
</td>
<td>DOUBLE</td>
<td></td>
Expand Down
5 changes: 4 additions & 1 deletion docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,10 @@ source:
DOUBLE UNSIGNED ZEROFILL<br>
DOUBLE PRECISION<br>
DOUBLE PRECISION UNSIGNED<br>
DOUBLE PRECISION UNSIGNED ZEROFILL
DOUBLE PRECISION UNSIGNED ZEROFILL<br>
FLOAT(p, s)<br>
REAL(p, s)<br>
DOUBLE(p, s)
</td>
<td>DOUBLE</td>
<td></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class MySqlTypeUtils {
private static final String MULTIPOLYGON = "MULTIPOLYGON";
private static final String MULTILINESTRING = "MULTILINESTRING";
private static final String UNKNOWN = "UNKNOWN";
private static final int FLOAT_LENGTH_UNSPECIFIED_FLAG = -1;

/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
Expand Down Expand Up @@ -164,7 +165,12 @@ private static DataType convertFromColumn(Column column) {
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
return DataTypes.FLOAT();
if (column.length() != FLOAT_LENGTH_UNSPECIFIED_FLAG) {
// For FLOAT types with length provided explicitly, treat it like DOUBLE
return DataTypes.DOUBLE();
} else {
return DataTypes.FLOAT();
}
case REAL:
case REAL_UNSIGNED:
case REAL_UNSIGNED_ZEROFILL:
Expand Down Expand Up @@ -236,7 +242,7 @@ private static DataType convertFromColumn(Column column) {
return DataTypes.ARRAY(DataTypes.STRING());
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", typeName));
String.format("MySQL type '%s' is not supported yet.", typeName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,18 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_LTZ(0));
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE());

Object[] expectedSnapshot =
new Object[] {
Expand All @@ -265,7 +276,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
2d,
3d,
5d,
7d,
11d,
13d,
17d,
19d,
23d,
29d,
31d,
37d
};

Object[] expectedStreamRecord =
Expand All @@ -282,7 +305,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
2d,
3d,
5d,
7d,
11d,
13d,
17d,
19d,
23d,
29d,
31d,
37d
};

database.createAndInitialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,156 @@ public void testMysql8AccessTimeTypesSchema() {
assertThat(actualSchema).isEqualTo(expectedSchema);
}

@Test
public void testMysql57PrecisionTypesSchema() {
fullTypesMySql57Database.createAndInitialize();

String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);

Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(
fullTypesMySql57Database.getDatabaseName(), "precision_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.DECIMAL(6, 2),
DataTypes.DECIMAL(9, 4),
DataTypes.DECIMAL(20, 4),
DataTypes.TIME(0),
DataTypes.TIME(3),
DataTypes.TIME(6),
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE()
},
new String[] {
"id",
"decimal_c0",
"decimal_c1",
"decimal_c2",
"time_c",
"time_3_c",
"time_6_c",
"datetime_c",
"datetime3_c",
"datetime6_c",
"timestamp_c",
"timestamp3_c",
"timestamp6_c",
"float_c0",
"float_c1",
"float_c2",
"real_c0",
"real_c1",
"real_c2",
"double_c0",
"double_c1",
"double_c2",
"double_precision_c0",
"double_precision_c1",
"double_precision_c2"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}

@Test
public void testMysql8PrecisionTypesSchema() {
fullTypesMySql8Database.createAndInitialize();

String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql8Database);

Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(
fullTypesMySql8Database.getDatabaseName(), "precision_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.DECIMAL(6, 2),
DataTypes.DECIMAL(9, 4),
DataTypes.DECIMAL(20, 4),
DataTypes.TIME(0),
DataTypes.TIME(3),
DataTypes.TIME(6),
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE()
},
new String[] {
"id",
"decimal_c0",
"decimal_c1",
"decimal_c2",
"time_c",
"time_3_c",
"time_6_c",
"datetime_c",
"datetime3_c",
"datetime6_c",
"timestamp_c",
"timestamp3_c",
"timestamp6_c",
"float_c0",
"float_c1",
"float_c2",
"real_c0",
"real_c1",
"real_c2",
"double_c0",
"double_c1",
"double_c2",
"double_precision_c0",
"double_precision_c1",
"double_precision_c2"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}

private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ CREATE TABLE precision_types
timestamp_c TIMESTAMP(0) NULL,
timestamp3_c TIMESTAMP(3) NULL,
timestamp6_c TIMESTAMP(6) NULL,
float_c0 FLOAT(6, 0),
float_c1 FLOAT(20, 3),
float_c2 FLOAT(24, 12),
real_c0 REAL(6, 0),
real_c1 REAL(20, 3),
real_c2 REAL(24, 12),
double_c0 DOUBLE(6, 0),
double_c1 DOUBLE(20, 3),
double_c2 DOUBLE(24, 12),
double_precision_c0 DOUBLE PRECISION(6, 0),
double_precision_c1 DOUBLE PRECISION(20, 3),
double_precision_c2 DOUBLE PRECISION(24, 12),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

Expand All @@ -156,4 +168,16 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22');
'2020-07-17 18:00:22',
2,
3,
5,
7,
11,
13,
17,
19,
23,
29,
31,
37);
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,21 @@ CREATE TABLE precision_types
datetime_c DATETIME(0),
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP(0),
timestamp3_c TIMESTAMP(3),
timestamp6_c TIMESTAMP(6),
timestamp_c TIMESTAMP(0) NULL,
timestamp3_c TIMESTAMP(3) NULL,
timestamp6_c TIMESTAMP(6) NULL,
float_c0 FLOAT(6, 0),
float_c1 FLOAT(20, 3),
float_c2 FLOAT(24, 12),
real_c0 REAL(6, 0),
real_c1 REAL(20, 3),
real_c2 REAL(24, 12),
double_c0 DOUBLE(6, 0),
double_c1 DOUBLE(20, 3),
double_c2 DOUBLE(24, 12),
double_precision_c0 DOUBLE PRECISION(6, 0),
double_precision_c1 DOUBLE PRECISION(20, 3),
double_precision_c2 DOUBLE PRECISION(24, 12),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

Expand All @@ -160,4 +172,16 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22');
'2020-07-17 18:00:22',
2,
3,
5,
7,
11,
13,
17,
19,
23,
29,
31,
37);