Skip to content

Commit 4ca398e

Browse files
author
dujie
committed
Merge remote-tracking branch 'origin/test_1.12_5.0_di' into conflict_1.12_5.0
# Conflicts: # flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/inputformat/BinlogInputFormatBuilder.java # flinkx-connectors/flinkx-connector-doris/src/main/java/com/dtstack/flinkx/connector/doris/rest/DorisLoadClient.java # flinkx-connectors/flinkx-connector-kafka/pom.xml # flinkx-connectors/flinkx-connector-kafka/src/main/java/com/dtstack/flinkx/connector/kafka/serialization/ticdc/TicdcDeserializationSchema.java # flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormat.java # flinkx-dirtydata-collectors/flinkx-dirtydata-collector-log/src/main/java/com/dtstack/flinkx/dirty/log/LogDirtyDataCollector.java
2 parents 1d15638 + 7a2207d commit 4ca398e

File tree

115 files changed

+2737
-886
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+2737
-886
lines changed
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
package com.alibaba.otter.canal.parse.inbound.mysql.ddl;
2+
3+
import com.alibaba.fastsql.sql.SQLUtils;
4+
import com.alibaba.fastsql.sql.ast.SQLExpr;
5+
import com.alibaba.fastsql.sql.ast.SQLStatement;
6+
import com.alibaba.fastsql.sql.ast.expr.SQLIdentifierExpr;
7+
import com.alibaba.fastsql.sql.ast.expr.SQLPropertyExpr;
8+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableAddConstraint;
9+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableAddIndex;
10+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableDropConstraint;
11+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableDropIndex;
12+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableDropKey;
13+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableItem;
14+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableRename;
15+
import com.alibaba.fastsql.sql.ast.statement.SQLAlterTableStatement;
16+
import com.alibaba.fastsql.sql.ast.statement.SQLConstraint;
17+
import com.alibaba.fastsql.sql.ast.statement.SQLCreateDatabaseStatement;
18+
import com.alibaba.fastsql.sql.ast.statement.SQLCreateIndexStatement;
19+
import com.alibaba.fastsql.sql.ast.statement.SQLCreateTableStatement;
20+
import com.alibaba.fastsql.sql.ast.statement.SQLCreateViewStatement;
21+
import com.alibaba.fastsql.sql.ast.statement.SQLDeleteStatement;
22+
import com.alibaba.fastsql.sql.ast.statement.SQLDropDatabaseStatement;
23+
import com.alibaba.fastsql.sql.ast.statement.SQLDropIndexStatement;
24+
import com.alibaba.fastsql.sql.ast.statement.SQLDropTableStatement;
25+
import com.alibaba.fastsql.sql.ast.statement.SQLDropViewStatement;
26+
import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
27+
import com.alibaba.fastsql.sql.ast.statement.SQLInsertStatement;
28+
import com.alibaba.fastsql.sql.ast.statement.SQLTableSource;
29+
import com.alibaba.fastsql.sql.ast.statement.SQLTruncateStatement;
30+
import com.alibaba.fastsql.sql.ast.statement.SQLUnique;
31+
import com.alibaba.fastsql.sql.ast.statement.SQLUpdateStatement;
32+
import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlRenameTableStatement;
33+
import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlRenameTableStatement.Item;
34+
import com.alibaba.fastsql.sql.parser.ParserException;
35+
import com.alibaba.fastsql.util.JdbcConstants;
36+
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
37+
import org.apache.commons.lang.StringUtils;
38+
39+
import java.util.ArrayList;
40+
import java.util.Arrays;
41+
import java.util.List;
42+
43+
/**
44+
* @author agapple 2017年7月27日 下午4:05:34
45+
* @since 1.0.25
46+
*/
47+
public class DruidDdlParser {
48+
49+
public static List<DdlResult> parse(String queryString, String schmeaName) {
50+
List<SQLStatement> stmtList = null;
51+
try {
52+
stmtList = SQLUtils.parseStatements(queryString, JdbcConstants.MYSQL, false);
53+
} catch (ParserException e) {
54+
// 可能存在一些SQL是不支持的,比如存储过程
55+
DdlResult ddlResult = new DdlResult();
56+
ddlResult.setType(EventType.QUERY);
57+
return Arrays.asList(ddlResult);
58+
}
59+
60+
List<DdlResult> ddlResults = new ArrayList<DdlResult>();
61+
for (SQLStatement statement : stmtList) {
62+
if (statement instanceof SQLCreateTableStatement) {
63+
DdlResult ddlResult = new DdlResult();
64+
SQLCreateTableStatement createTable = (SQLCreateTableStatement) statement;
65+
processName(ddlResult, schmeaName, createTable.getName(), false);
66+
ddlResult.setType(EventType.CREATE);
67+
ddlResults.add(ddlResult);
68+
} else if (statement instanceof SQLAlterTableStatement) {
69+
SQLAlterTableStatement alterTable = (SQLAlterTableStatement) statement;
70+
for (SQLAlterTableItem item : alterTable.getItems()) {
71+
if (item instanceof SQLAlterTableRename) {
72+
DdlResult ddlResult = new DdlResult();
73+
processName(ddlResult, schmeaName, alterTable.getName(), true);
74+
processName(
75+
ddlResult,
76+
schmeaName,
77+
((SQLAlterTableRename) item).getToName(),
78+
false);
79+
ddlResult.setType(EventType.RENAME);
80+
ddlResults.add(ddlResult);
81+
} else if (item instanceof SQLAlterTableAddIndex) {
82+
DdlResult ddlResult = new DdlResult();
83+
processName(ddlResult, schmeaName, alterTable.getName(), false);
84+
ddlResult.setType(EventType.CINDEX);
85+
ddlResults.add(ddlResult);
86+
} else if (item instanceof SQLAlterTableDropIndex
87+
|| item instanceof SQLAlterTableDropKey) {
88+
DdlResult ddlResult = new DdlResult();
89+
processName(ddlResult, schmeaName, alterTable.getName(), false);
90+
ddlResult.setType(EventType.DINDEX);
91+
ddlResults.add(ddlResult);
92+
} else if (item instanceof SQLAlterTableAddConstraint) {
93+
DdlResult ddlResult = new DdlResult();
94+
processName(ddlResult, schmeaName, alterTable.getName(), false);
95+
SQLConstraint constraint =
96+
((SQLAlterTableAddConstraint) item).getConstraint();
97+
if (constraint instanceof SQLUnique) {
98+
ddlResult.setType(EventType.CINDEX);
99+
ddlResults.add(ddlResult);
100+
}
101+
} else if (item instanceof SQLAlterTableDropConstraint) {
102+
DdlResult ddlResult = new DdlResult();
103+
processName(ddlResult, schmeaName, alterTable.getName(), false);
104+
ddlResult.setType(EventType.DINDEX);
105+
ddlResults.add(ddlResult);
106+
} else {
107+
DdlResult ddlResult = new DdlResult();
108+
processName(ddlResult, schmeaName, alterTable.getName(), false);
109+
ddlResult.setType(EventType.ALTER);
110+
ddlResults.add(ddlResult);
111+
}
112+
}
113+
} else if (statement instanceof SQLDropTableStatement) {
114+
SQLDropTableStatement dropTable = (SQLDropTableStatement) statement;
115+
for (SQLExprTableSource tableSource : dropTable.getTableSources()) {
116+
DdlResult ddlResult = new DdlResult();
117+
processName(ddlResult, schmeaName, tableSource.getExpr(), false);
118+
ddlResult.setType(EventType.ERASE);
119+
ddlResults.add(ddlResult);
120+
}
121+
} else if (statement instanceof SQLCreateIndexStatement) {
122+
SQLCreateIndexStatement createIndex = (SQLCreateIndexStatement) statement;
123+
SQLTableSource tableSource = createIndex.getTable();
124+
DdlResult ddlResult = new DdlResult();
125+
processName(
126+
ddlResult, schmeaName, ((SQLExprTableSource) tableSource).getExpr(), false);
127+
ddlResult.setType(EventType.CINDEX);
128+
ddlResults.add(ddlResult);
129+
} else if (statement instanceof SQLDropIndexStatement) {
130+
SQLDropIndexStatement dropIndex = (SQLDropIndexStatement) statement;
131+
SQLExprTableSource tableSource = dropIndex.getTableName();
132+
DdlResult ddlResult = new DdlResult();
133+
processName(ddlResult, schmeaName, tableSource.getExpr(), false);
134+
ddlResult.setType(EventType.DINDEX);
135+
ddlResults.add(ddlResult);
136+
} else if (statement instanceof SQLTruncateStatement) {
137+
SQLTruncateStatement truncate = (SQLTruncateStatement) statement;
138+
for (SQLExprTableSource tableSource : truncate.getTableSources()) {
139+
DdlResult ddlResult = new DdlResult();
140+
processName(ddlResult, schmeaName, tableSource.getExpr(), false);
141+
ddlResult.setType(EventType.TRUNCATE);
142+
ddlResults.add(ddlResult);
143+
}
144+
} else if (statement instanceof MySqlRenameTableStatement) {
145+
MySqlRenameTableStatement rename = (MySqlRenameTableStatement) statement;
146+
for (Item item : rename.getItems()) {
147+
DdlResult ddlResult = new DdlResult();
148+
processName(ddlResult, schmeaName, item.getName(), true);
149+
processName(ddlResult, schmeaName, item.getTo(), false);
150+
ddlResult.setType(EventType.RENAME);
151+
ddlResults.add(ddlResult);
152+
}
153+
} else if (statement instanceof SQLInsertStatement) {
154+
DdlResult ddlResult = new DdlResult();
155+
SQLInsertStatement insert = (SQLInsertStatement) statement;
156+
processName(ddlResult, schmeaName, insert.getTableName(), false);
157+
ddlResult.setType(EventType.INSERT);
158+
ddlResults.add(ddlResult);
159+
} else if (statement instanceof SQLUpdateStatement) {
160+
DdlResult ddlResult = new DdlResult();
161+
SQLUpdateStatement update = (SQLUpdateStatement) statement;
162+
// 拿到的表名可能为null,比如update a,b set a.id=x
163+
processName(ddlResult, schmeaName, update.getTableName(), false);
164+
ddlResult.setType(EventType.UPDATE);
165+
ddlResults.add(ddlResult);
166+
} else if (statement instanceof SQLDeleteStatement) {
167+
DdlResult ddlResult = new DdlResult();
168+
SQLDeleteStatement delete = (SQLDeleteStatement) statement;
169+
// 拿到的表名可能为null,比如delete a,b from a where a.id = b.id
170+
processName(ddlResult, schmeaName, delete.getTableName(), false);
171+
ddlResult.setType(EventType.DELETE);
172+
ddlResults.add(ddlResult);
173+
} else if (statement instanceof SQLCreateDatabaseStatement) {
174+
SQLCreateDatabaseStatement create = (SQLCreateDatabaseStatement) statement;
175+
DdlResult ddlResult = new DdlResult();
176+
ddlResult.setType(EventType.QUERY);
177+
processName(ddlResult, create.getDatabaseName(), null, false);
178+
ddlResults.add(ddlResult);
179+
} else if (statement instanceof SQLDropDatabaseStatement) {
180+
SQLDropDatabaseStatement drop = (SQLDropDatabaseStatement) statement;
181+
DdlResult ddlResult = new DdlResult();
182+
ddlResult.setType(EventType.QUERY);
183+
processName(ddlResult, drop.getDatabaseName(), null, false);
184+
ddlResults.add(ddlResult);
185+
}
186+
187+
// -------add view operator
188+
else if (statement instanceof SQLDropViewStatement) {
189+
SQLDropViewStatement drop = (SQLDropViewStatement) statement;
190+
for (SQLExprTableSource tableSource : drop.getTableSources()) {
191+
DdlResult ddlResult = new DdlResult();
192+
processName(ddlResult, schmeaName, tableSource.getExpr(), false);
193+
ddlResult.setType(EventType.ERASE);
194+
ddlResults.add(ddlResult);
195+
}
196+
} else if (statement instanceof SQLCreateViewStatement) {
197+
DdlResult ddlResult = new DdlResult();
198+
SQLCreateViewStatement createView = (SQLCreateViewStatement) statement;
199+
processName(ddlResult, schmeaName, createView.getName(), false);
200+
ddlResult.setType(EventType.CREATE);
201+
ddlResults.add(ddlResult);
202+
}
203+
}
204+
205+
return ddlResults;
206+
}
207+
208+
private static void processName(
209+
DdlResult ddlResult, String schema, SQLExpr sqlName, boolean isOri) {
210+
if (sqlName == null) {
211+
if (StringUtils.isNotBlank(schema)) {
212+
ddlResult.setSchemaName(schema);
213+
}
214+
return;
215+
}
216+
217+
String table = null;
218+
if (sqlName instanceof SQLPropertyExpr) {
219+
SQLIdentifierExpr owner = (SQLIdentifierExpr) ((SQLPropertyExpr) sqlName).getOwner();
220+
schema = unescapeName(owner.getName());
221+
table = unescapeName(((SQLPropertyExpr) sqlName).getName());
222+
} else if (sqlName instanceof SQLIdentifierExpr) {
223+
table = unescapeName(((SQLIdentifierExpr) sqlName).getName());
224+
}
225+
226+
if (isOri) {
227+
ddlResult.setOriSchemaName(schema);
228+
ddlResult.setOriTableName(table);
229+
} else {
230+
ddlResult.setSchemaName(schema);
231+
ddlResult.setTableName(table);
232+
}
233+
}
234+
235+
public static String unescapeName(String name) {
236+
if (name != null && name.length() > 2) {
237+
char c0 = name.charAt(0);
238+
char x0 = name.charAt(name.length() - 1);
239+
if ((c0 == '"' && x0 == '"') || (c0 == '`' && x0 == '`')) {
240+
return name.substring(1, name.length() - 1);
241+
}
242+
}
243+
244+
return name;
245+
}
246+
247+
public static String unescapeQuotaName(String name) {
248+
if (name != null && name.length() > 2) {
249+
char c0 = name.charAt(0);
250+
char x0 = name.charAt(name.length() - 1);
251+
if (c0 == '\'' && x0 == '\'') {
252+
return name.substring(1, name.length() - 1);
253+
}
254+
}
255+
256+
return name;
257+
}
258+
}

flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.flink.table.data.RowData;
3939
import org.apache.flink.types.RowKind;
4040

41+
import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
42+
import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
4143
import com.alibaba.otter.canal.protocol.CanalEntry;
4244
import org.apache.commons.collections.CollectionUtils;
4345

@@ -48,6 +50,7 @@
4850
import java.util.List;
4951
import java.util.Locale;
5052
import java.util.Map;
53+
import java.util.stream.Collectors;
5154

5255
import static com.dtstack.flinkx.constants.CDCConstantValue.AFTER;
5356
import static com.dtstack.flinkx.constants.CDCConstantValue.AFTER_;
@@ -83,8 +86,70 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
8386

8487
if (rowChange.getIsDdl()) {
8588
// 处理 ddl rowChange
86-
DdlRowData ddlRowData = swapEventToDdlRowData(binlogEventRow);
87-
result.add(ddlRowData);
89+
if (rowChange.getEventType().equals(CanalEntry.EventType.ERASE)) {
90+
List<DdlResult> parse =
91+
DruidDdlParser.parse(
92+
binlogEventRow.getRowChange().getSql(),
93+
binlogEventRow.getRowChange().getDdlSchemaName());
94+
result.addAll(
95+
parse.stream()
96+
.map(
97+
i ->
98+
DdlRowDataBuilder.builder()
99+
.setDatabaseName(i.getSchemaName())
100+
.setTableName(i.getTableName())
101+
.setContent(
102+
"DROP TABLE "
103+
+ i.getSchemaName()
104+
+ "."
105+
+ i.getTableName())
106+
.setType(
107+
binlogEventRow
108+
.getRowChange()
109+
.getEventType()
110+
.name())
111+
.setLsn(
112+
String.valueOf(
113+
binlogEventRow
114+
.getExecuteTime()))
115+
.build())
116+
.collect(Collectors.toList()));
117+
118+
} else if (rowChange.getEventType().equals(CanalEntry.EventType.RENAME)) {
119+
List<DdlResult> parse =
120+
DruidDdlParser.parse(
121+
binlogEventRow.getRowChange().getSql(),
122+
binlogEventRow.getRowChange().getDdlSchemaName());
123+
result.addAll(
124+
parse.stream()
125+
.map(
126+
i ->
127+
DdlRowDataBuilder.builder()
128+
.setDatabaseName(i.getSchemaName())
129+
.setTableName(i.getTableName())
130+
.setContent(
131+
"RENAME TABLE "
132+
+ i.getOriSchemaName()
133+
+ "."
134+
+ i.getOriTableName()
135+
+ " TO "
136+
+ i.getSchemaName()
137+
+ "."
138+
+ i.getTableName())
139+
.setType(
140+
binlogEventRow
141+
.getRowChange()
142+
.getEventType()
143+
.name())
144+
.setLsn(
145+
String.valueOf(
146+
binlogEventRow
147+
.getExecuteTime()))
148+
.build())
149+
.collect(Collectors.toList()));
150+
} else {
151+
result.add(swapEventToDdlRowData(binlogEventRow));
152+
}
88153
}
89154

90155
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {

flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/inputformat/BinlogInputFormatBuilder.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -242,20 +242,6 @@ protected void checkFormat() {
242242
.append("];\n");
243243
}
244244

245-
// 判断是否是updrdb,如果是则获取updrdb数据节点连接信息和表engine信息
246-
try {
247-
BinlogUtil.getUpdrdbMessage(conn, binlogConf);
248-
} catch (FlinkxException e) {
249-
sb.append(e.getMessage());
250-
}
251-
252-
// updrdb支持多并发
253-
if (binlogConf.getParallelism() > 1 && !binlogConf.isUpdrdb()) {
254-
sb.append("binLog can not support channel bigger than 1, current channel is [")
255-
.append(binlogConf.getParallelism())
256-
.append("];\n");
257-
}
258-
259245
if (sb.length() > 0) {
260246
throw new IllegalArgumentException(sb.toString());
261247
}

0 commit comments

Comments
 (0)