Skip to content

Commit d026554

Browse files
committed
[FLINK-36408] Fix MySQL pipeline connector failed to parse FLOAT type with precision
Signed-off-by: yuxiqian <[email protected]>
1 parent a01e720 commit d026554

File tree

7 files changed

+257
-12
lines changed

7 files changed

+257
-12
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,10 @@ source:
382382
DOUBLE UNSIGNED ZEROFILL<br>
383383
DOUBLE PRECISION<br>
384384
DOUBLE PRECISION UNSIGNED<br>
385-
DOUBLE PRECISION UNSIGNED ZEROFILL
385+
DOUBLE PRECISION UNSIGNED ZEROFILL<br>
386+
FLOAT(p, s)<br>
387+
REAL(p, s)<br>
388+
DOUBLE(p, s)
386389
</td>
387390
<td>DOUBLE</td>
388391
<td></td>

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,10 @@ source:
392392
DOUBLE UNSIGNED ZEROFILL<br>
393393
DOUBLE PRECISION<br>
394394
DOUBLE PRECISION UNSIGNED<br>
395-
DOUBLE PRECISION UNSIGNED ZEROFILL
395+
DOUBLE PRECISION UNSIGNED ZEROFILL<br>
396+
FLOAT(p, s)<br>
397+
REAL(p, s)<br>
398+
DOUBLE(p, s)
396399
</td>
397400
<td>DOUBLE</td>
398401
<td></td>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public class MySqlTypeUtils {
107107
private static final String MULTIPOLYGON = "MULTIPOLYGON";
108108
private static final String MULTILINESTRING = "MULTILINESTRING";
109109
private static final String UNKNOWN = "UNKNOWN";
110+
private static final int FLOAT_LENGTH_UNSPECIFIED_FLAG = -1;
110111

111112
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
112113
public static DataType fromDbzColumn(Column column) {
@@ -164,7 +165,12 @@ private static DataType convertFromColumn(Column column) {
164165
case FLOAT:
165166
case FLOAT_UNSIGNED:
166167
case FLOAT_UNSIGNED_ZEROFILL:
167-
return DataTypes.FLOAT();
168+
if (column.length() != FLOAT_LENGTH_UNSPECIFIED_FLAG) {
169+
// For FLOAT types with length provided explicitly, treat it like DOUBLE
170+
return DataTypes.DOUBLE();
171+
} else {
172+
return DataTypes.FLOAT();
173+
}
168174
case REAL:
169175
case REAL_UNSIGNED:
170176
case REAL_UNSIGNED_ZEROFILL:
@@ -236,7 +242,7 @@ private static DataType convertFromColumn(Column column) {
236242
return DataTypes.ARRAY(DataTypes.STRING());
237243
default:
238244
throw new UnsupportedOperationException(
239-
String.format("Don't support MySQL type '%s' yet.", typeName));
245+
String.format("MySQL type '%s' is not supported yet.", typeName));
240246
}
241247
}
242248

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,18 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
249249
DataTypes.TIMESTAMP_LTZ(0),
250250
DataTypes.TIMESTAMP_LTZ(3),
251251
DataTypes.TIMESTAMP_LTZ(6),
252-
DataTypes.TIMESTAMP_LTZ(0));
252+
DataTypes.DOUBLE(),
253+
DataTypes.DOUBLE(),
254+
DataTypes.DOUBLE(),
255+
DataTypes.DOUBLE(),
256+
DataTypes.DOUBLE(),
257+
DataTypes.DOUBLE(),
258+
DataTypes.DOUBLE(),
259+
DataTypes.DOUBLE(),
260+
DataTypes.DOUBLE(),
261+
DataTypes.DOUBLE(),
262+
DataTypes.DOUBLE(),
263+
DataTypes.DOUBLE());
253264

254265
Object[] expectedSnapshot =
255266
new Object[] {
@@ -265,7 +276,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
265276
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
266277
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
267278
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
268-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
279+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
280+
2d,
281+
3d,
282+
5d,
283+
7d,
284+
11d,
285+
13d,
286+
17d,
287+
19d,
288+
23d,
289+
29d,
290+
31d,
291+
37d
269292
};
270293

271294
Object[] expectedStreamRecord =
@@ -282,7 +305,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
282305
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
283306
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
284307
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
285-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
308+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
309+
2d,
310+
3d,
311+
5d,
312+
7d,
313+
11d,
314+
13d,
315+
17d,
316+
19d,
317+
23d,
318+
29d,
319+
31d,
320+
37d
286321
};
287322

288323
database.createAndInitialize();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,156 @@ public void testMysql8AccessTimeTypesSchema() {
207207
assertThat(actualSchema).isEqualTo(expectedSchema);
208208
}
209209

210+
@Test
211+
public void testMysql57PrecisionTypesSchema() {
212+
fullTypesMySql57Database.createAndInitialize();
213+
214+
String[] tables = new String[] {"precision_types"};
215+
MySqlMetadataAccessor metadataAccessor =
216+
getMetadataAccessor(tables, fullTypesMySql57Database);
217+
218+
Schema actualSchema =
219+
metadataAccessor.getTableSchema(
220+
TableId.tableId(
221+
fullTypesMySql57Database.getDatabaseName(), "precision_types"));
222+
Schema expectedSchema =
223+
Schema.newBuilder()
224+
.primaryKey("id")
225+
.fromRowDataType(
226+
RowType.of(
227+
new DataType[] {
228+
DataTypes.DECIMAL(20, 0).notNull(),
229+
DataTypes.DECIMAL(6, 2),
230+
DataTypes.DECIMAL(9, 4),
231+
DataTypes.DECIMAL(20, 4),
232+
DataTypes.TIME(0),
233+
DataTypes.TIME(3),
234+
DataTypes.TIME(6),
235+
DataTypes.TIMESTAMP(0),
236+
DataTypes.TIMESTAMP(3),
237+
DataTypes.TIMESTAMP(6),
238+
DataTypes.TIMESTAMP_LTZ(0),
239+
DataTypes.TIMESTAMP_LTZ(3),
240+
DataTypes.TIMESTAMP_LTZ(6),
241+
DataTypes.DOUBLE(),
242+
DataTypes.DOUBLE(),
243+
DataTypes.DOUBLE(),
244+
DataTypes.DOUBLE(),
245+
DataTypes.DOUBLE(),
246+
DataTypes.DOUBLE(),
247+
DataTypes.DOUBLE(),
248+
DataTypes.DOUBLE(),
249+
DataTypes.DOUBLE(),
250+
DataTypes.DOUBLE(),
251+
DataTypes.DOUBLE(),
252+
DataTypes.DOUBLE()
253+
},
254+
new String[] {
255+
"id",
256+
"decimal_c0",
257+
"decimal_c1",
258+
"decimal_c2",
259+
"time_c",
260+
"time_3_c",
261+
"time_6_c",
262+
"datetime_c",
263+
"datetime3_c",
264+
"datetime6_c",
265+
"timestamp_c",
266+
"timestamp3_c",
267+
"timestamp6_c",
268+
"float_c0",
269+
"float_c1",
270+
"float_c2",
271+
"real_c0",
272+
"real_c1",
273+
"real_c2",
274+
"double_c0",
275+
"double_c1",
276+
"double_c2",
277+
"double_precision_c0",
278+
"double_precision_c1",
279+
"double_precision_c2"
280+
}))
281+
.build();
282+
assertThat(actualSchema).isEqualTo(expectedSchema);
283+
}
284+
285+
@Test
286+
public void testMysql8PrecisionTypesSchema() {
287+
fullTypesMySql8Database.createAndInitialize();
288+
289+
String[] tables = new String[] {"precision_types"};
290+
MySqlMetadataAccessor metadataAccessor =
291+
getMetadataAccessor(tables, fullTypesMySql8Database);
292+
293+
Schema actualSchema =
294+
metadataAccessor.getTableSchema(
295+
TableId.tableId(
296+
fullTypesMySql8Database.getDatabaseName(), "precision_types"));
297+
Schema expectedSchema =
298+
Schema.newBuilder()
299+
.primaryKey("id")
300+
.fromRowDataType(
301+
RowType.of(
302+
new DataType[] {
303+
DataTypes.DECIMAL(20, 0).notNull(),
304+
DataTypes.DECIMAL(6, 2),
305+
DataTypes.DECIMAL(9, 4),
306+
DataTypes.DECIMAL(20, 4),
307+
DataTypes.TIME(0),
308+
DataTypes.TIME(3),
309+
DataTypes.TIME(6),
310+
DataTypes.TIMESTAMP(0),
311+
DataTypes.TIMESTAMP(3),
312+
DataTypes.TIMESTAMP(6),
313+
DataTypes.TIMESTAMP_LTZ(0),
314+
DataTypes.TIMESTAMP_LTZ(3),
315+
DataTypes.TIMESTAMP_LTZ(6),
316+
DataTypes.DOUBLE(),
317+
DataTypes.DOUBLE(),
318+
DataTypes.DOUBLE(),
319+
DataTypes.DOUBLE(),
320+
DataTypes.DOUBLE(),
321+
DataTypes.DOUBLE(),
322+
DataTypes.DOUBLE(),
323+
DataTypes.DOUBLE(),
324+
DataTypes.DOUBLE(),
325+
DataTypes.DOUBLE(),
326+
DataTypes.DOUBLE(),
327+
DataTypes.DOUBLE()
328+
},
329+
new String[] {
330+
"id",
331+
"decimal_c0",
332+
"decimal_c1",
333+
"decimal_c2",
334+
"time_c",
335+
"time_3_c",
336+
"time_6_c",
337+
"datetime_c",
338+
"datetime3_c",
339+
"datetime6_c",
340+
"timestamp_c",
341+
"timestamp3_c",
342+
"timestamp6_c",
343+
"float_c0",
344+
"float_c1",
345+
"float_c2",
346+
"real_c0",
347+
"real_c1",
348+
"real_c2",
349+
"double_c0",
350+
"double_c1",
351+
"double_c2",
352+
"double_precision_c0",
353+
"double_precision_c1",
354+
"double_precision_c2"
355+
}))
356+
.build();
357+
assertThat(actualSchema).isEqualTo(expectedSchema);
358+
}
359+
210360
private void testAccessDatabaseAndTable(UniqueDatabase database) {
211361
database.createAndInitialize();
212362

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,18 @@ CREATE TABLE precision_types
140140
timestamp_c TIMESTAMP(0) NULL,
141141
timestamp3_c TIMESTAMP(3) NULL,
142142
timestamp6_c TIMESTAMP(6) NULL,
143+
float_c0 FLOAT(6, 0),
144+
float_c1 FLOAT(20, 3),
145+
float_c2 FLOAT(24, 12),
146+
real_c0 REAL(6, 0),
147+
real_c1 REAL(20, 3),
148+
real_c2 REAL(24, 12),
149+
double_c0 DOUBLE(6, 0),
150+
double_c1 DOUBLE(20, 3),
151+
double_c2 DOUBLE(24, 12),
152+
double_precision_c0 DOUBLE PRECISION(6, 0),
153+
double_precision_c1 DOUBLE PRECISION(20, 3),
154+
double_precision_c2 DOUBLE PRECISION(24, 12),
143155
PRIMARY KEY (id)
144156
) DEFAULT CHARSET=utf8;
145157

@@ -156,4 +168,16 @@ VALUES (DEFAULT,
156168
'2020-07-17 18:00:22',
157169
'2020-07-17 18:00',
158170
'2020-07-17 18:00:22',
159-
'2020-07-17 18:00:22');
171+
'2020-07-17 18:00:22',
172+
2,
173+
3,
174+
5,
175+
7,
176+
11,
177+
13,
178+
17,
179+
19,
180+
23,
181+
29,
182+
31,
183+
37);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,21 @@ CREATE TABLE precision_types
141141
datetime_c DATETIME(0),
142142
datetime3_c DATETIME(3),
143143
datetime6_c DATETIME(6),
144-
timestamp_c TIMESTAMP(0),
145-
timestamp3_c TIMESTAMP(3),
146-
timestamp6_c TIMESTAMP(6),
144+
timestamp_c TIMESTAMP(0) NULL,
145+
timestamp3_c TIMESTAMP(3) NULL,
146+
timestamp6_c TIMESTAMP(6) NULL,
147+
float_c0 FLOAT(6, 0),
148+
float_c1 FLOAT(20, 3),
149+
float_c2 FLOAT(24, 12),
150+
real_c0 REAL(6, 0),
151+
real_c1 REAL(20, 3),
152+
real_c2 REAL(24, 12),
153+
double_c0 DOUBLE(6, 0),
154+
double_c1 DOUBLE(20, 3),
155+
double_c2 DOUBLE(24, 12),
156+
double_precision_c0 DOUBLE PRECISION(6, 0),
157+
double_precision_c1 DOUBLE PRECISION(20, 3),
158+
double_precision_c2 DOUBLE PRECISION(24, 12),
147159
PRIMARY KEY (id)
148160
) DEFAULT CHARSET=utf8;
149161

@@ -160,4 +172,16 @@ VALUES (DEFAULT,
160172
'2020-07-17 18:00:22',
161173
'2020-07-17 18:00',
162174
'2020-07-17 18:00:22',
163-
'2020-07-17 18:00:22');
175+
'2020-07-17 18:00:22',
176+
2,
177+
3,
178+
5,
179+
7,
180+
11,
181+
13,
182+
17,
183+
19,
184+
23,
185+
29,
186+
31,
187+
37);

0 commit comments

Comments
 (0)