Skip to content

Commit d2e3e8b

Browse files
author
yanxi0227
committed
Merge branch '1.5_v3.5.5' into v1.5.0_dev
2 parents 79f0824 + 1461ef4 commit d2e3e8b

File tree

14 files changed

+61
-70
lines changed

14 files changed

+61
-70
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlKind;
2829
import org.apache.calcite.sql.SqlNode;
@@ -86,11 +87,7 @@ public void parseSelectFields(JoinInfo joinInfo) {
8687
SqlNode conditionNode = joinInfo.getCondition();
8788

8889
List<SqlNode> sqlNodeList = Lists.newArrayList();
89-
if (conditionNode.getKind() == SqlKind.AND) {
90-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands()));
91-
} else {
92-
sqlNodeList.add(conditionNode);
93-
}
90+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
9491

9592
for (SqlNode sqlNode : sqlNodeList) {
9693
dealOneEqualCon(sqlNode, sideTableName);

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlIdentifier;
2829
import org.apache.calcite.sql.SqlKind;
@@ -55,11 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5556
SqlNode conditionNode = joinInfo.getCondition();
5657

5758
List<SqlNode> sqlNodeList = Lists.newArrayList();
58-
if (conditionNode.getKind() == SqlKind.AND) {
59-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands()));
60-
} else {
61-
sqlNodeList.add(conditionNode);
62-
}
59+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6360

6461
for (SqlNode sqlNode : sqlNodeList) {
6562
dealOneEqualCon(sqlNode, sideTableName);

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,8 @@ public static void main(String[] args) throws Exception {
139139
}
140140

141141
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
142-
DtClassLoader dtClassLoader = new DtClassLoader(new URL[]{}, threadClassLoader);
143-
Thread.currentThread().setContextClassLoader(dtClassLoader);
144-
145-
URLClassLoader parentClassloader;
146-
if(!ClusterMode.local.name().equals(deployMode)){
147-
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
148-
}else{
149-
parentClassloader = dtClassLoader;
150-
}
142+
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
143+
Thread.currentThread().setContextClassLoader(parentClassloader);
151144

152145
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
153146
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
@@ -218,7 +211,7 @@ public static void main(String[] args) throws Exception {
218211

219212
if(env instanceof MyLocalStreamEnvironment) {
220213
List<URL> urlList = new ArrayList<>();
221-
urlList.addAll(Arrays.asList(dtClassLoader.getURLs()));
214+
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
222215
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
223216
}
224217

@@ -254,7 +247,6 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLo
254247
if (classLoader == null) {
255248
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
256249
}
257-
classLoader.loadClass(funcInfo.getClassName());
258250
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
259251
tableEnv, classLoader);
260252
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,16 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
159159
JoinInfo tableInfo = new JoinInfo();
160160
tableInfo.setLeftTableName(leftTbName);
161161
tableInfo.setRightTableName(rightTableName);
162-
tableInfo.setLeftTableAlias(leftTbAlias);
163-
tableInfo.setRightTableAlias(rightTableAlias);
162+
if (leftTbAlias.equals("")){
163+
tableInfo.setLeftTableAlias(leftTbName);
164+
} else {
165+
tableInfo.setLeftTableAlias(leftTbAlias);
166+
}
167+
if (leftTbAlias.equals("")){
168+
tableInfo.setRightTableAlias(rightTableName);
169+
} else {
170+
tableInfo.setRightTableAlias(rightTableAlias);
171+
}
164172
tableInfo.setLeftIsSideTable(leftIsSide);
165173
tableInfo.setRightIsSideTable(rightIsSide);
166174
tableInfo.setLeftNode(leftNode);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2626
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2727
import com.dtstack.flink.sql.util.ClassUtil;
28+
import com.dtstack.flink.sql.util.ParseUtils;
2829
import org.apache.calcite.sql.SqlBasicCall;
2930
import org.apache.calcite.sql.SqlDataTypeSpec;
3031
import org.apache.calcite.sql.SqlIdentifier;
@@ -435,7 +436,7 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
435436
}
436437
}
437438

438-
replaceSelectFieldName(elseNode, mappingTable, tableAlias);
439+
((SqlCase) selectNode).setOperand(3, replaceSelectFieldName(elseNode, mappingTable, tableAlias));
439440
return selectNode;
440441
}else if(selectNode.getKind() == OTHER){
441442
//不处理
@@ -461,12 +462,7 @@ private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias,
461462

462463
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName){
463464
List<SqlNode> sqlNodeList = Lists.newArrayList();
464-
if(conditionNode.getKind() == SqlKind.AND){
465-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
466-
}else{
467-
sqlNodeList.add(conditionNode);
468-
}
469-
465+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
470466
List<String> conditionFields = Lists.newArrayList();
471467
for(SqlNode sqlNode : sqlNodeList){
472468
if(sqlNode.getKind() != SqlKind.EQUALS){
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.dtstack.flink.sql.util;
2+
3+
import org.apache.calcite.sql.SqlBasicCall;
4+
import org.apache.calcite.sql.SqlKind;
5+
import org.apache.calcite.sql.SqlNode;
6+
7+
import java.util.List;
8+
9+
/**
10+
* @Auther: jiangjunjie
11+
* @Date: 2019-06-30 14:57
12+
* @Description:
13+
*/
14+
public class ParseUtils {
15+
public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
16+
if(conditionNode.getKind() == SqlKind.AND && ((SqlBasicCall)conditionNode).getOperandList().size()==2){
17+
parseAnd(((SqlBasicCall)conditionNode).getOperands()[0], sqlNodeList);
18+
sqlNodeList.add(((SqlBasicCall)conditionNode).getOperands()[1]);
19+
}else{
20+
sqlNodeList.add(conditionNode);
21+
}
22+
}
23+
}

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.SideInfo;
2626
import com.dtstack.flink.sql.side.SideTableInfo;
27+
import com.dtstack.flink.sql.util.ParseUtils;
2728
import org.apache.calcite.sql.SqlBasicCall;
2829
import org.apache.calcite.sql.SqlKind;
2930
import org.apache.calcite.sql.SqlNode;
@@ -53,11 +54,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5354
SqlNode conditionNode = joinInfo.getCondition();
5455

5556
List<SqlNode> sqlNodeList = Lists.newArrayList();
56-
if(conditionNode.getKind() == SqlKind.AND){
57-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
58-
}else{
59-
sqlNodeList.add(conditionNode);
60-
}
57+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6158

6259
for(SqlNode sqlNode : sqlNodeList){
6360
dealOneEqualCon(sqlNode, sideTableName);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.dtstack.flink.sql.side.SideInfo;
66
import com.dtstack.flink.sql.side.SideTableInfo;
77
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
8+
import com.dtstack.flink.sql.util.ParseUtils;
89
import org.apache.calcite.sql.SqlBasicCall;
910
import org.apache.calcite.sql.SqlKind;
1011
import org.apache.calcite.sql.SqlNode;
@@ -55,11 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5556
SqlNode conditionNode = joinInfo.getCondition();
5657

5758
List<SqlNode> sqlNodeList = Lists.newArrayList();
58-
if(conditionNode.getKind() == SqlKind.AND){
59-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
60-
}else{
61-
sqlNodeList.add(conditionNode);
62-
}
59+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6360

6461
for(SqlNode sqlNode : sqlNodeList){
6562
dealOneEqualCon(sqlNode, sideTableName);

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.mongo.table.MongoSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlKind;
2829
import org.apache.calcite.sql.SqlNode;
@@ -85,11 +86,7 @@ public void parseSelectFields(JoinInfo joinInfo){
8586
SqlNode conditionNode = joinInfo.getCondition();
8687

8788
List<SqlNode> sqlNodeList = Lists.newArrayList();
88-
if(conditionNode.getKind() == SqlKind.AND){
89-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
90-
}else{
91-
sqlNodeList.add(conditionNode);
92-
}
89+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
9390

9491
for(SqlNode sqlNode : sqlNodeList){
9592
dealOneEqualCon(sqlNode, sideTableName);

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.mongo.table.MongoSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlIdentifier;
2829
import org.apache.calcite.sql.SqlKind;
@@ -55,11 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5556
SqlNode conditionNode = joinInfo.getCondition();
5657

5758
List<SqlNode> sqlNodeList = Lists.newArrayList();
58-
if(conditionNode.getKind() == SqlKind.AND){
59-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
60-
}else{
61-
sqlNodeList.add(conditionNode);
62-
}
59+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6360

6461
for(SqlNode sqlNode : sqlNodeList){
6562
dealOneEqualCon(sqlNode, sideTableName);

0 commit comments

Comments
 (0)