Skip to content

Commit 06298ec

Browse files
authored
[BugFix][CDCSROUCE] Exception after submitting CDCSOURCE job to sync data from tables whose names start with digit(s) (#4493)
1 parent 6a3b892 commit 06298ec

File tree

5 files changed

+117
-30
lines changed

5 files changed

+117
-30
lines changed

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.dinky.cdc.AbstractSinkBuilder;
2424
import org.dinky.cdc.convert.DataTypeConverter;
2525
import org.dinky.cdc.utils.FlinkStatementUtil;
26+
import org.dinky.data.flink.table.FlinkTableObjectIdentifier;
2627
import org.dinky.data.model.Column;
2728
import org.dinky.data.model.FlinkCDCConfig;
2829
import org.dinky.data.model.Schema;
@@ -54,6 +55,8 @@
5455
import java.util.Map;
5556
import java.util.stream.Collectors;
5657

58+
import cn.hutool.core.util.StrUtil;
59+
5760
public abstract class AbstractSqlSinkBuilder extends AbstractSinkBuilder implements Serializable {
5861

5962
protected AbstractSqlSinkBuilder() {}
@@ -220,19 +223,20 @@ protected void executeCatalogStatement() {}
220223
* @return view name
221224
*/
222225
public static String replaceViewNameMiddleLineToUnderLine(String viewName) {
223-
if (!viewName.isEmpty() && viewName.contains("-")) {
226+
if (StrUtil.isNotBlank(viewName) && viewName.contains("-")) {
224227
logger.warn("the view name [{}] contains '-', replace '-' to '_' for flink use view name", viewName);
225228
return viewName.replaceAll("-", "_");
226229
}
227230
return viewName;
228231
}
229232

230-
protected List<Operation> createInsertOperations(Table table, String viewName, String tableName) {
231-
String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config);
233+
protected List<Operation> createInsertOperations(
234+
Table table, FlinkTableObjectIdentifier sourceTable, FlinkTableObjectIdentifier targetTable) {
235+
String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, targetTable, sourceTable, config);
232236
logger.info(cdcSqlInsert);
233237

234238
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
235-
logger.info("Create {} FlinkSQL insert into successful...", tableName);
239+
logger.info("Create {} FlinkSQL insert into successful...", targetTable);
236240
if (operations.isEmpty()) {
237241
return operations;
238242
}

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.dinky.cdc.SinkBuilder;
2323
import org.dinky.cdc.utils.FlinkStatementUtil;
24+
import org.dinky.data.flink.table.FlinkTableObjectIdentifier;
2425
import org.dinky.data.model.FlinkCDCConfig;
2526
import org.dinky.data.model.Table;
2627

@@ -44,47 +45,51 @@ private SQLSinkBuilder(FlinkCDCConfig config) {
4445
super(config);
4546
}
4647

47-
private String addSourceTableView(DataStream<Row> rowDataDataStream, Table table) {
48+
private FlinkTableObjectIdentifier addSourceTableView(DataStream<Row> rowDataDataStream, Table table) {
4849
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
4950
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());
5051

5152
customTableEnvironment.createTemporaryView(
5253
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
5354
logger.info("Create {} temporaryView successful...", viewName);
54-
return viewName;
55+
return FlinkTableObjectIdentifier.of(viewName);
5556
}
5657

5758
@Override
5859
protected void addTableSink(DataStream<Row> rowDataDataStream, Table table) {
59-
final String viewName = addSourceTableView(rowDataDataStream, table);
60+
final FlinkTableObjectIdentifier viewName = addSourceTableView(rowDataDataStream, table);
6061
final String sinkSchemaName = getSinkSchemaName(table);
6162
final String sinkTableName = getSinkTableName(table);
63+
final FlinkTableObjectIdentifier sinkTable = FlinkTableObjectIdentifier.of(sinkTableName);
6264

6365
// Multiple sinks and single sink
6466
if (CollectionUtils.isEmpty(config.getSinks())) {
65-
addSinkInsert(table, viewName, sinkTableName, sinkSchemaName, sinkTableName);
67+
addSinkInsert(table, viewName, sinkTable, sinkSchemaName, sinkTable);
6668
} else {
6769
for (int index = 0; index < config.getSinks().size(); index++) {
68-
String tableName = sinkTableName;
70+
FlinkTableObjectIdentifier newSinkTable = sinkTable;
6971
if (config.getSinks().size() != 1) {
70-
tableName = sinkTableName + "_" + index;
72+
newSinkTable = FlinkTableObjectIdentifier.of(sinkTable + "_" + index);
7173
}
7274

7375
config.setSink(config.getSinks().get(index));
74-
addSinkInsert(table, viewName, tableName, sinkSchemaName, sinkTableName);
76+
addSinkInsert(table, viewName, newSinkTable, sinkSchemaName, sinkTable);
7577
}
7678
}
7779
}
7880

7981
private List<Operation> addSinkInsert(
80-
Table table, String viewName, String tableName, String sinkSchemaName, String sinkTableName) {
82+
Table table,
83+
FlinkTableObjectIdentifier sourceTable,
84+
FlinkTableObjectIdentifier targetTable,
85+
String sinkSchemaName,
86+
FlinkTableObjectIdentifier sinkTable) {
8187
String pkList = StringUtils.join(getPKList(table), ".");
82-
String flinkDDL =
83-
FlinkStatementUtil.getFlinkDDL(table, tableName, config, sinkSchemaName, sinkTableName, pkList);
88+
String flinkDDL = FlinkStatementUtil.getFlinkDDL(table, targetTable, config, sinkSchemaName, sinkTable, pkList);
8489
logger.info(flinkDDL);
8590
customTableEnvironment.executeSql(flinkDDL);
86-
logger.info("Create {} FlinkSQL DDL successful...", tableName);
87-
return createInsertOperations(table, viewName, tableName);
91+
logger.info("Create {} FlinkSQL DDL successful...", targetTable);
92+
return createInsertOperations(table, sourceTable, targetTable);
8893
}
8994

9095
@Override

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.dinky.cdc.SinkBuilder;
2323
import org.dinky.cdc.sql.AbstractSqlSinkBuilder;
2424
import org.dinky.cdc.utils.FlinkStatementUtil;
25+
import org.dinky.data.flink.table.FlinkTableObjectIdentifier;
2526
import org.dinky.data.model.FlinkCDCConfig;
2627
import org.dinky.data.model.Table;
2728

@@ -46,15 +47,17 @@ public void addTableSink(DataStream<Row> rowDataDataStream, Table table) {
4647
String catalogName = config.getSink().get("catalog.name");
4748
String sinkSchemaName = getSinkSchemaName(table);
4849
String tableName = getSinkTableName(table);
49-
String sinkTableName = catalogName + ".`" + sinkSchemaName + "`.`" + tableName + "`";
5050
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
5151
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());
5252

5353
customTableEnvironment.createTemporaryView(
5454
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
5555
logger.info("Create {} temporaryView successful...", viewName);
5656

57-
createInsertOperations(table, viewName, sinkTableName);
57+
createInsertOperations(
58+
table,
59+
FlinkTableObjectIdentifier.of(viewName),
60+
FlinkTableObjectIdentifier.of(catalogName, sinkSchemaName, tableName));
5861
}
5962

6063
@Override

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.dinky.cdc.utils;
2121

22+
import org.dinky.data.flink.table.FlinkTableObjectIdentifier;
2223
import org.dinky.data.model.Column;
2324
import org.dinky.data.model.FlinkCDCConfig;
2425
import org.dinky.data.model.Table;
@@ -34,9 +35,13 @@ public class FlinkStatementUtil {
3435

3536
private FlinkStatementUtil() {}
3637

37-
public static String getCDCInsertSql(Table table, String targetName, String sourceName, FlinkCDCConfig config) {
38+
public static String getCDCInsertSql(
39+
Table table,
40+
FlinkTableObjectIdentifier targetTable,
41+
FlinkTableObjectIdentifier sourceTable,
42+
FlinkCDCConfig config) {
3843
StringBuilder sb = new StringBuilder("INSERT INTO ");
39-
sb.append(targetName);
44+
sb.append(targetTable.toTablePath());
4045
sb.append(" SELECT\n");
4146
for (int i = 0; i < table.getColumns().size(); i++) {
4247
sb.append(" ");
@@ -45,9 +50,8 @@ public static String getCDCInsertSql(Table table, String targetName, String sour
4550
}
4651
sb.append(getColumnProcessing(table.getColumns().get(i), config)).append(" \n");
4752
}
48-
sb.append(" FROM `");
49-
sb.append(sourceName);
50-
sb.append("`");
53+
sb.append(" FROM ");
54+
sb.append(sourceTable.toTablePath());
5155
return sb.toString();
5256
}
5357

@@ -65,19 +69,19 @@ public static String getColumnProcessing(Column column, FlinkCDCConfig config) {
6569

6670
public static String getFlinkDDL(
6771
Table table,
68-
String tableName,
72+
FlinkTableObjectIdentifier flinkTable,
6973
FlinkCDCConfig config,
7074
String sinkSchemaName,
71-
String sinkTableName,
75+
FlinkTableObjectIdentifier sinkTableName,
7276
String pkList) {
7377
StringBuilder sb = new StringBuilder();
7478
if (Integer.parseInt(EnvironmentInformation.getVersion().split("\\.")[1]) < 13) {
75-
sb.append("CREATE TABLE `");
79+
sb.append("CREATE TABLE ");
7680
} else {
77-
sb.append("CREATE TABLE IF NOT EXISTS `");
81+
sb.append("CREATE TABLE IF NOT EXISTS ");
7882
}
79-
sb.append(tableName);
80-
sb.append("` (\n");
83+
sb.append(flinkTable.toTablePath());
84+
sb.append(" (\n");
8185
List<String> pks = new ArrayList<>();
8286
for (int i = 0; i < table.getColumns().size(); i++) {
8387
String type =
@@ -110,7 +114,7 @@ public static String getFlinkDDL(
110114
sb.append(pksb);
111115
}
112116
sb.append(") WITH (\n");
113-
sb.append(getSinkConfigurationString(config, sinkSchemaName, sinkTableName, pkList));
117+
sb.append(getSinkConfigurationString(config, sinkSchemaName, sinkTableName.getObjectName(), pkList));
114118
sb.append(")\n");
115119
return sb.toString();
116120
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.dinky.data.flink.table;
21+
22+
import lombok.Getter;
23+
24+
@Getter
25+
public class FlinkTableObjectIdentifier {
26+
private final String catalogName;
27+
private final String databaseName;
28+
private final String objectName;
29+
30+
public static FlinkTableObjectIdentifier of(String catalogName, String databaseName, String objectName) {
31+
return new FlinkTableObjectIdentifier(catalogName, databaseName, objectName);
32+
}
33+
34+
public static FlinkTableObjectIdentifier of(String objectName) {
35+
return of(null, null, objectName);
36+
}
37+
38+
private FlinkTableObjectIdentifier(String catalogName, String databaseName, String objectName) {
39+
this.catalogName = catalogName;
40+
this.databaseName = databaseName;
41+
this.objectName = objectName;
42+
if (objectName == null) {
43+
throw new IllegalArgumentException("objectName can not be null");
44+
}
45+
}
46+
47+
/**
48+
*
49+
* @return catalogName.`databaseName`.`objectName`
50+
*/
51+
public String toTablePath() {
52+
StringBuilder sb = new StringBuilder();
53+
if (catalogName != null) {
54+
sb.append(catalogName);
55+
}
56+
if (databaseName != null) {
57+
sb.append(".`");
58+
sb.append(databaseName);
59+
sb.append("`.");
60+
}
61+
sb.append("`");
62+
sb.append(objectName);
63+
sb.append("`");
64+
return sb.toString();
65+
}
66+
67+
@Override
68+
public String toString() {
69+
return toTablePath();
70+
}
71+
}

0 commit comments

Comments
 (0)