Skip to content

Commit 38f39fa

Browse files
committed
Merge branch '1.5_v3.5.7' into 1.5_v3.6.0
# Conflicts: # kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java
2 parents 3440b9f + 3175aee commit 38f39fa

File tree

16 files changed

+75
-81
lines changed

16 files changed

+75
-81
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlKind;
2829
import org.apache.calcite.sql.SqlNode;
@@ -86,11 +87,7 @@ public void parseSelectFields(JoinInfo joinInfo) {
8687
SqlNode conditionNode = joinInfo.getCondition();
8788

8889
List<SqlNode> sqlNodeList = Lists.newArrayList();
89-
if (conditionNode.getKind() == SqlKind.AND) {
90-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands()));
91-
} else {
92-
sqlNodeList.add(conditionNode);
93-
}
90+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
9491

9592
for (SqlNode sqlNode : sqlNodeList) {
9693
dealOneEqualCon(sqlNode, sideTableName);

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlIdentifier;
2829
import org.apache.calcite.sql.SqlKind;
@@ -55,11 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5556
SqlNode conditionNode = joinInfo.getCondition();
5657

5758
List<SqlNode> sqlNodeList = Lists.newArrayList();
58-
if (conditionNode.getKind() == SqlKind.AND) {
59-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands()));
60-
} else {
61-
sqlNodeList.add(conditionNode);
62-
}
59+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6360

6461
for (SqlNode sqlNode : sqlNodeList) {
6562
dealOneEqualCon(sqlNode, sideTableName);

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,8 @@ public static void main(String[] args) throws Exception {
140140
}
141141

142142
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
143-
DtClassLoader dtClassLoader = new DtClassLoader(new URL[]{}, threadClassLoader);
144-
Thread.currentThread().setContextClassLoader(dtClassLoader);
145-
146-
URLClassLoader parentClassloader;
147-
if(!ClusterMode.local.name().equals(deployMode)){
148-
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
149-
}else{
150-
parentClassloader = dtClassLoader;
151-
}
143+
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
144+
Thread.currentThread().setContextClassLoader(parentClassloader);
152145

153146
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
154147
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
@@ -219,7 +212,7 @@ public static void main(String[] args) throws Exception {
219212

220213
if(env instanceof MyLocalStreamEnvironment) {
221214
List<URL> urlList = new ArrayList<>();
222-
urlList.addAll(Arrays.asList(dtClassLoader.getURLs()));
215+
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
223216
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
224217
}
225218

@@ -255,7 +248,6 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLo
255248
if (classLoader == null) {
256249
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
257250
}
258-
classLoader.loadClass(funcInfo.getClassName());
259251
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
260252
tableEnv, classLoader);
261253
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,16 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
159159
JoinInfo tableInfo = new JoinInfo();
160160
tableInfo.setLeftTableName(leftTbName);
161161
tableInfo.setRightTableName(rightTableName);
162-
tableInfo.setLeftTableAlias(leftTbAlias);
163-
tableInfo.setRightTableAlias(rightTableAlias);
162+
if (leftTbAlias.equals("")){
163+
tableInfo.setLeftTableAlias(leftTbName);
164+
} else {
165+
tableInfo.setLeftTableAlias(leftTbAlias);
166+
}
167+
if (leftTbAlias.equals("")){
168+
tableInfo.setRightTableAlias(rightTableName);
169+
} else {
170+
tableInfo.setRightTableAlias(rightTableAlias);
171+
}
164172
tableInfo.setLeftIsSideTable(leftIsSide);
165173
tableInfo.setRightIsSideTable(rightIsSide);
166174
tableInfo.setLeftNode(leftNode);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2626
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2727
import com.dtstack.flink.sql.util.ClassUtil;
28+
import com.dtstack.flink.sql.util.ParseUtils;
2829
import org.apache.calcite.sql.SqlBasicCall;
2930
import org.apache.calcite.sql.SqlDataTypeSpec;
3031
import org.apache.calcite.sql.SqlIdentifier;
@@ -393,6 +394,10 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
393394
|| selectNode.getKind() == BETWEEN
394395
|| selectNode.getKind() == IS_NULL
395396
|| selectNode.getKind() == IS_NOT_NULL
397+
|| selectNode.getKind() == LESS_THAN
398+
|| selectNode.getKind() == GREATER_THAN
399+
|| selectNode.getKind() == LESS_THAN_OR_EQUAL
400+
|| selectNode.getKind() == GREATER_THAN_OR_EQUAL
396401
){
397402
SqlBasicCall sqlBasicCall = (SqlBasicCall) selectNode;
398403
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
@@ -437,7 +442,7 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
437442
}
438443
}
439444

440-
replaceSelectFieldName(elseNode, mappingTable, tableAlias);
445+
((SqlCase) selectNode).setOperand(3, replaceSelectFieldName(elseNode, mappingTable, tableAlias));
441446
return selectNode;
442447
}else if(selectNode.getKind() == OTHER){
443448
//不处理
@@ -463,12 +468,7 @@ private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias,
463468

464469
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName){
465470
List<SqlNode> sqlNodeList = Lists.newArrayList();
466-
if(conditionNode.getKind() == SqlKind.AND){
467-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
468-
}else{
469-
sqlNodeList.add(conditionNode);
470-
}
471-
471+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
472472
List<String> conditionFields = Lists.newArrayList();
473473
for(SqlNode sqlNode : sqlNodeList){
474474
if(sqlNode.getKind() != SqlKind.EQUALS){

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public boolean dealKeyPattern(String fieldRow, TableInfo tableInfo){
7979

8080
public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
8181

82-
String[] fieldRows = DtStringUtil.splitIgnoreQuotaBrackets(fieldsInfo, ",");
82+
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
8383
for(String fieldRow : fieldRows){
8484
fieldRow = fieldRow.trim();
8585
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,16 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
5757
List<String> tokensList = new ArrayList<>();
5858
boolean inQuotes = false;
5959
boolean inSingleQuotes = false;
60+
int bracketLeftNum = 0;
6061
StringBuilder b = new StringBuilder();
6162
for (char c : str.toCharArray()) {
6263
if(c == delimiter){
6364
if (inQuotes) {
6465
b.append(c);
6566
} else if(inSingleQuotes){
6667
b.append(c);
68+
} else if(bracketLeftNum > 0){
69+
b.append(c);
6770
}else {
6871
tokensList.add(b.toString());
6972
b = new StringBuilder();
@@ -74,6 +77,12 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
7477
}else if(c == '\''){
7578
inSingleQuotes = !inSingleQuotes;
7679
b.append(c);
80+
}else if(c == '('){
81+
bracketLeftNum++;
82+
b.append(c);
83+
}else if(c == ')'){
84+
bracketLeftNum--;
85+
b.append(c);
7786
}else{
7887
b.append(c);
7988
}
@@ -84,16 +93,6 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
8493
return tokensList;
8594
}
8695

87-
/***
88-
* Split the specified string delimiter --- ignored in brackets and quotation marks delimiter
89-
* @param str
90-
* @param delimter
91-
* @return
92-
*/
93-
public static String[] splitIgnoreQuotaBrackets(String str, String delimter){
94-
String splitPatternStr = delimter + "(?![^()]*+\\))(?![^{}]*+})(?![^\\[\\]]*+\\])(?=(?:[^\"]|\"[^\"]*\")*$)";
95-
return str.split(splitPatternStr);
96-
}
9796

9897
public static String replaceIgnoreQuota(String str, String oriStr, String replaceStr){
9998
String splitPatternStr = oriStr + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)(?=(?:[^']*'[^']*')*[^']*$)";
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.dtstack.flink.sql.util;
2+
3+
import org.apache.calcite.sql.SqlBasicCall;
4+
import org.apache.calcite.sql.SqlKind;
5+
import org.apache.calcite.sql.SqlNode;
6+
7+
import java.util.List;
8+
9+
/**
10+
* @Auther: jiangjunjie
11+
* @Date: 2019-06-30 14:57
12+
* @Description:
13+
*/
14+
public class ParseUtils {
15+
public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
16+
if(conditionNode.getKind() == SqlKind.AND && ((SqlBasicCall)conditionNode).getOperandList().size()==2){
17+
parseAnd(((SqlBasicCall)conditionNode).getOperands()[0], sqlNodeList);
18+
sqlNodeList.add(((SqlBasicCall)conditionNode).getOperands()[1]);
19+
}else{
20+
sqlNodeList.add(conditionNode);
21+
}
22+
}
23+
}

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.SideInfo;
2626
import com.dtstack.flink.sql.side.SideTableInfo;
27+
import com.dtstack.flink.sql.util.ParseUtils;
2728
import org.apache.calcite.sql.SqlBasicCall;
2829
import org.apache.calcite.sql.SqlKind;
2930
import org.apache.calcite.sql.SqlNode;
@@ -53,11 +54,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5354
SqlNode conditionNode = joinInfo.getCondition();
5455

5556
List<SqlNode> sqlNodeList = Lists.newArrayList();
56-
if(conditionNode.getKind() == SqlKind.AND){
57-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
58-
}else{
59-
sqlNodeList.add(conditionNode);
60-
}
57+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6158

6259
for(SqlNode sqlNode : sqlNodeList){
6360
dealOneEqualCon(sqlNode, sideTableName);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.dtstack.flink.sql.side.SideInfo;
66
import com.dtstack.flink.sql.side.SideTableInfo;
77
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
8+
import com.dtstack.flink.sql.util.ParseUtils;
89
import org.apache.calcite.sql.SqlBasicCall;
910
import org.apache.calcite.sql.SqlKind;
1011
import org.apache.calcite.sql.SqlNode;
@@ -55,11 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5556
SqlNode conditionNode = joinInfo.getCondition();
5657

5758
List<SqlNode> sqlNodeList = Lists.newArrayList();
58-
if(conditionNode.getKind() == SqlKind.AND){
59-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
60-
}else{
61-
sqlNodeList.add(conditionNode);
62-
}
59+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6360

6461
for(SqlNode sqlNode : sqlNodeList){
6562
dealOneEqualCon(sqlNode, sideTableName);

0 commit comments

Comments
 (0)