Skip to content

Commit 6b504ff

Browse files
committed
Merge branch '1.5.0_dev_field_as' into 'v1.5.0_dev'
内部字段转换 See merge request !50
2 parents add0cab + ca7a71e commit 6b504ff

File tree

3 files changed

+104
-28
lines changed

3 files changed

+104
-28
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.flink.table.api.Table;
77
import org.apache.flink.table.api.TableEnvironment;
88
import org.apache.flink.table.api.TableException;
9+
import org.apache.flink.table.api.ValidationException;
910
import org.apache.flink.table.api.java.StreamTableEnvironment;
1011
import org.apache.flink.table.calcite.FlinkPlannerImpl;
1112
import org.apache.flink.table.plan.logical.LogicalRelNode;
@@ -36,17 +37,23 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
3637
Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
3738
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
3839

40+
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
41+
method.setAccessible(true);
42+
43+
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
44+
String[] fieldNames = targetTable.tableSink().getFieldNames();
45+
46+
Table newTable = null;
47+
3948
try {
40-
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
41-
method.setAccessible(true);
42-
43-
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
44-
String[] fieldNames = targetTable.tableSink().getFieldNames();
45-
Table newTable = queryResult.select(String.join(",", fieldNames));
46-
// insert query result into sink table
47-
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
49+
newTable = queryResult.select(String.join(",", fieldNames));
4850
} catch (Exception e) {
49-
throw e;
51+
throw new ValidationException(
52+
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
53+
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
54+
"TableSink schema: " + String.join(",", fieldNames));
5055
}
56+
57+
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
5158
}
5259
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,16 @@
1717
*/
1818

1919

20-
2120
package com.dtstack.flink.sql.side;
2221

23-
import com.dtstack.flink.sql.Main;
2422
import com.dtstack.flink.sql.enums.ECacheType;
2523
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2624
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2725
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2826
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2927
import com.dtstack.flink.sql.util.ClassUtil;
3028
import com.dtstack.flink.sql.util.ParseUtils;
29+
import org.apache.calcite.sql.SqlAsOperator;
3130
import org.apache.calcite.sql.SqlBasicCall;
3231
import org.apache.calcite.sql.SqlDataTypeSpec;
3332
import org.apache.calcite.sql.SqlIdentifier;
@@ -37,6 +36,7 @@
3736
import org.apache.calcite.sql.SqlLiteral;
3837
import org.apache.calcite.sql.SqlNode;
3938
import org.apache.calcite.sql.SqlNodeList;
39+
import org.apache.calcite.sql.SqlOperator;
4040
import org.apache.calcite.sql.SqlSelect;
4141
import org.apache.calcite.sql.fun.SqlCase;
4242
import org.apache.calcite.sql.parser.SqlParseException;
@@ -101,8 +101,11 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
101101

102102
if(preIsSideJoin){
103103
preIsSideJoin = false;
104+
List<String> fieldNames = null;
104105
for(FieldReplaceInfo replaceInfo : replaceInfoList){
106+
fieldNames = Lists.newArrayList();
105107
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias());
108+
addAliasForFieldNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
106109
}
107110
}
108111

@@ -126,6 +129,68 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
126129

127130
}
128131

132+
133+
private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, HashBasedTable<String, String, String> mappingTable) {
134+
SqlKind sqlKind = pollSqlNode.getKind();
135+
switch (sqlKind) {
136+
case INSERT:
137+
SqlNode source = ((SqlInsert) pollSqlNode).getSource();
138+
addAliasForFieldNode(source, fieldList, mappingTable);
139+
break;
140+
141+
case AS:
142+
addAliasForFieldNode(((SqlBasicCall) pollSqlNode).getOperands()[0], fieldList, mappingTable);
143+
break;
144+
145+
case SELECT:
146+
147+
SqlNodeList selectList = ((SqlSelect) pollSqlNode).getSelectList();
148+
149+
selectList.getList().forEach(node -> {
150+
if (node.getKind() == IDENTIFIER) {
151+
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
152+
if (sqlIdentifier.names.size() == 1) {
153+
return;
154+
}
155+
// save real field
156+
String fieldName = sqlIdentifier.names.get(1);
157+
if (!fieldName.endsWith("0") || fieldName.endsWith("0") && mappingTable.columnMap().containsKey(fieldName)) {
158+
fieldList.add(fieldName);
159+
}
160+
161+
}
162+
});
163+
164+
for (int i = 0; i < selectList.getList().size(); i++) {
165+
SqlNode node = selectList.get(i);
166+
if (node.getKind() == IDENTIFIER) {
167+
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
168+
if (sqlIdentifier.names.size() == 1) {
169+
return;
170+
}
171+
172+
String name = sqlIdentifier.names.get(1);
173+
// avoid real field pv0 convert pv
174+
if (name.endsWith("0") && !fieldList.contains(name) && !fieldList.contains(name.substring(0, name.length() - 1))) {
175+
SqlOperator operator = new SqlAsOperator();
176+
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
177+
178+
SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(name.substring(0, name.length() - 1), null, sqlParserPos);
179+
SqlNode[] sqlNodes = new SqlNode[2];
180+
sqlNodes[0] = sqlIdentifier;
181+
sqlNodes[1] = sqlIdentifierAlias;
182+
SqlBasicCall sqlBasicCall = new SqlBasicCall(operator, sqlNodes, sqlParserPos);
183+
184+
selectList.set(i, sqlBasicCall);
185+
}
186+
187+
}
188+
}
189+
break;
190+
}
191+
}
192+
193+
129194
public AliasInfo parseASNode(SqlNode sqlNode) throws SqlParseException {
130195
SqlKind sqlKind = sqlNode.getKind();
131196
if(sqlKind != AS){
@@ -172,7 +237,7 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
172237
replaceFieldName(sqlSource, mappingTable, targetTableName, tableAlias);
173238
break;
174239
case AS:
175-
SqlNode asNode = ((SqlBasicCall)sqlNode).getOperands()[0];
240+
SqlNode asNode = ((SqlBasicCall) sqlNode).getOperands()[0];
176241
replaceFieldName(asNode, mappingTable, targetTableName, tableAlias);
177242
break;
178243
case SELECT:
@@ -267,7 +332,7 @@ private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String
267332
}
268333
}
269334

270-
public SqlNode filterNodeWithTargetName(SqlNode sqlNode, String targetTableName){
335+
public SqlNode filterNodeWithTargetName(SqlNode sqlNode, String targetTableName) {
271336

272337
SqlKind sqlKind = sqlNode.getKind();
273338
switch (sqlKind){
@@ -304,7 +369,7 @@ public SqlNode filterNodeWithTargetName(SqlNode sqlNode, String targetTableName)
304369
}
305370

306371

307-
public void setLocalSqlPluginPath(String localSqlPluginPath){
372+
public void setLocalSqlPluginPath(String localSqlPluginPath) {
308373
this.localSqlPluginPath = localSqlPluginPath;
309374
}
310375

@@ -348,12 +413,12 @@ private List<SqlNode> replaceSelectStarFieldName(SqlNode selectNode, HashBasedTa
348413
}
349414
}
350415

351-
private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String, String, String> mappingTable, String tableAlias){
352-
if(selectNode.getKind() == AS){
353-
SqlNode leftNode = ((SqlBasicCall)selectNode).getOperands()[0];
416+
private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String, String, String> mappingTable, String tableAlias) {
417+
if (selectNode.getKind() == AS) {
418+
SqlNode leftNode = ((SqlBasicCall) selectNode).getOperands()[0];
354419
SqlNode replaceNode = replaceSelectFieldName(leftNode, mappingTable, tableAlias);
355-
if(replaceNode != null){
356-
((SqlBasicCall)selectNode).getOperands()[0] = replaceNode;
420+
if (replaceNode != null) {
421+
((SqlBasicCall) selectNode).getOperands()[0] = replaceNode;
357422
}
358423

359424
return selectNode;
@@ -438,15 +503,15 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
438503
for(int i=0; i<whenOperands.size(); i++){
439504
SqlNode oneOperand = whenOperands.get(i);
440505
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias);
441-
if(replaceNode != null){
506+
if (replaceNode != null) {
442507
whenOperands.set(i, replaceNode);
443508
}
444509
}
445510

446511
for(int i=0; i<thenOperands.size(); i++){
447512
SqlNode oneOperand = thenOperands.get(i);
448513
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias);
449-
if(replaceNode != null){
514+
if (replaceNode != null) {
450515
thenOperands.set(i, replaceNode);
451516
}
452517
}
@@ -463,17 +528,18 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
463528

464529
/**
465530
* Analyzing conditions are very join the dimension tables include all equivalent conditions (i.e., dimension table is the primary key definition
531+
*
466532
* @return
467533
*/
468-
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, SideTableInfo sideTableInfo){
534+
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, SideTableInfo sideTableInfo) {
469535
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
470536
if(CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))){
471537
return true;
472538
}
473539
return false;
474540
}
475541

476-
private List<String> convertPrimaryAlias(SideTableInfo sideTableInfo){
542+
private List<String> convertPrimaryAlias(SideTableInfo sideTableInfo) {
477543
List<String> res = Lists.newArrayList();
478544
sideTableInfo.getPrimaryKeys().forEach(field -> {
479545
res.add(sideTableInfo.getPhysicalFields().getOrDefault(field, field));
@@ -535,8 +601,11 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
535601

536602
if(preIsSideJoin){
537603
preIsSideJoin = false;
538-
for(FieldReplaceInfo replaceInfo : replaceInfoList){
604+
List<String> fieldNames = null;
605+
for (FieldReplaceInfo replaceInfo : replaceInfoList) {
606+
fieldNames = Lists.newArrayList();
539607
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias());
608+
addAliasForFieldNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
540609
}
541610
}
542611

@@ -572,6 +641,7 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
572641
}
573642
}
574643
}
644+
575645
private void joinFun(Object pollObj, Map<String, Table> localTableCache,
576646
Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
577647
List<FieldReplaceInfo> replaceInfoList) throws Exception{
@@ -655,12 +725,11 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
655725
}
656726
}
657727

658-
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table){
728+
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
659729
List<String> fieldNames = new LinkedList<>();
660730
String fieldsInfo = result.getFieldsInfoStr();
661731
String[] fields = fieldsInfo.split(",");
662-
for (int i=0; i < fields.length; i++)
663-
{
732+
for (int i = 0; i < fields.length; i++) {
664733
String[] filed = fields[i].split("\\s");
665734
if (filed.length < 2 || fields.length != table.getSchema().getColumnNames().length){
666735
return false;

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6262
// trigger preparedStatement execute batch interval
6363
private long batchWaitInterval = 10000l;
6464
// PreparedStatement execute batch num
65-
private int batchNum = 1;
65+
private int batchNum = 100;
6666
private String insertQuery;
6767
public int[] typesArray;
6868

0 commit comments

Comments
 (0)