Skip to content

Commit b3037d4

Browse files
committed
Merge branch '1.8_dev_multi_join' into 'v1.8.0_dev'
连续Left join处理 See merge request !86
2 parents 3fc3adf + 49bc561 commit b3037d4

File tree

8 files changed

+444
-42
lines changed

8 files changed

+444
-42
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql;
2222

23+
import com.dtstack.flink.sql.config.CalciteConfig;
2324
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2425
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2526
import com.dtstack.flink.sql.enums.ClusterMode;
@@ -98,10 +99,6 @@ public class Main {
9899

99100
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
100101

101-
private static Config config = org.apache.calcite.sql.parser.SqlParser
102-
.configBuilder()
103-
.setLex(Lex.MYSQL)
104-
.build();
105102

106103
public static void main(String[] args) throws Exception {
107104

@@ -171,7 +168,7 @@ private static void sqlTranslation(Options options,StreamTableEnvironment tableE
171168
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
172169
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
173170

174-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql,config).parseStmt();
171+
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
175172
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
176173
tmp.setExecSql(tmpSql);
177174
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.config;
21+
22+
import org.apache.calcite.config.Lex;
23+
import org.apache.calcite.sql.parser.SqlParser;
24+
import org.apache.calcite.sql.parser.SqlParser.Config;
25+
26+
public class CalciteConfig {
27+
28+
public static Config MYSQL_LEX_CONFIG = SqlParser
29+
.configBuilder()
30+
.setLex(Lex.MYSQL)
31+
.build();
32+
33+
34+
35+
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2626

2727
import java.io.Serializable;
28+
import java.util.Map;
2829

2930
/**
3031
* Join信息
@@ -40,6 +41,8 @@ public class JoinInfo implements Serializable {
4041

4142
//左表是否是维表
4243
private boolean leftIsSideTable;
44+
//左表是 转换后的中间表
45+
private boolean leftIsMidTable;
4346

4447
//右表是否是维表
4548
private boolean rightIsSideTable;
@@ -63,6 +66,8 @@ public class JoinInfo implements Serializable {
6366
private SqlNode selectNode;
6467

6568
private JoinType joinType;
69+
// 左边是中间转换表,做表映射关系,给替换属性名称使用
70+
private Map<String, String> leftTabMapping;
6671

6772
public String getSideTableName(){
6873
if(leftIsSideTable){
@@ -87,6 +92,22 @@ public String getNewTableName(){
8792
return leftStr + "_" + rightTableName;
8893
}
8994

95+
public boolean isLeftIsMidTable() {
96+
return leftIsMidTable;
97+
}
98+
99+
public void setLeftIsMidTable(boolean leftIsMidTable) {
100+
this.leftIsMidTable = leftIsMidTable;
101+
}
102+
103+
public Map<String, String> getLeftTabMapping() {
104+
return leftTabMapping;
105+
}
106+
107+
public void setLeftTabMapping(Map<String, String> leftTabMapping) {
108+
this.leftTabMapping = leftTabMapping;
109+
}
110+
90111
public String getNewTableAlias(){
91112
return leftTableAlias + "_" + rightTableAlias;
92113
}

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,12 @@
4141

4242
public class ParserJoinField {
4343

44+
4445
/**
45-
* Need to parse the fields of information and where selectlist
46+
* build row by field
47+
* @param sqlNode select node
48+
* @param scope join left and right table all info
49+
* @param getAll true,get all fields from two tables; false, extract useful field from select node
4650
* @return
4751
*/
4852
public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, boolean getAll){

0 commit comments

Comments
 (0)