Skip to content

Commit ed5c333

Browse files
author
杨思枢_思枢
committed
Merge branch 'v1.8.0_dev_innnerjoin_notequal' into 'v1.8.0_dev'
[异步维表支持非等值连接] See merge request !66
2 parents fc035e7 + a3dc6c6 commit ed5c333

File tree

8 files changed

+41
-13
lines changed

8 files changed

+41
-13
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5454
sqlCondition = "select ${selectField} from ${tableName} ";
5555
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase() + "."
5656
+ cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
57-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
5857
}
5958

6059
@Override

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ public void parseSelectFields(JoinInfo joinInfo){
109109
}
110110

111111
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
112-
if(sqlNode.getKind() != SqlKind.EQUALS){
113-
throw new RuntimeException("not equal operator.");
112+
if(!SqlKind.COMPARISON.contains(sqlNode.getKind())){
113+
throw new RuntimeException("not compare operator.");
114114
}
115115

116116
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,8 +585,8 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
585585
ParseUtils.parseAnd(conditionNode, sqlNodeList);
586586
List<String> conditionFields = Lists.newArrayList();
587587
for(SqlNode sqlNode : sqlNodeList){
588-
if(sqlNode.getKind() != SqlKind.EQUALS){
589-
throw new RuntimeException("not equal operator.");
588+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
589+
throw new RuntimeException("not compare operator.");
590590
}
591591

592592
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import org.apache.calcite.sql.SqlBasicCall;
44
import org.apache.calcite.sql.SqlKind;
55
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.commons.lang3.StringUtils;
67

8+
import java.util.HashSet;
79
import java.util.List;
10+
import java.util.Set;
811

912
/**
1013
* @Auther: jiangjunjie
@@ -20,4 +23,25 @@ public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
2023
sqlNodeList.add(conditionNode);
2124
}
2225
}
26+
27+
public static void parseJoinCompareOperate(SqlNode condition, List<String> sqlJoinCompareOperate) {
28+
SqlBasicCall joinCondition = (SqlBasicCall) condition;
29+
if (joinCondition.getKind() == SqlKind.AND) {
30+
List<SqlNode> operandList = joinCondition.getOperandList();
31+
for (SqlNode sqlNode : operandList) {
32+
parseJoinCompareOperate(sqlNode, sqlJoinCompareOperate);
33+
}
34+
} else {
35+
String operator = parseOperator(joinCondition.getKind());
36+
sqlJoinCompareOperate.add(operator);
37+
}
38+
}
39+
40+
public static String parseOperator(SqlKind sqlKind) {
41+
if (StringUtils.equalsIgnoreCase(sqlKind.toString(), "NOT_EQUALS")){
42+
return "!=";
43+
}
44+
return sqlKind.sql;
45+
}
46+
2347
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ public static void main(String[] args) throws Exception {
100100
clusterClient.shutdown();
101101
}
102102

103-
System.out.println("---submit end----");
104103
}
105104

106105
private static String[] parseJson(String[] args) {

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
27+
import org.apache.calcite.sql.SqlKind;
2728
import org.apache.calcite.sql.SqlNode;
2829
import org.apache.commons.lang3.StringUtils;
2930
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -48,7 +49,11 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
4849
SqlNode conditionNode = joinInfo.getCondition();
4950

5051
List<SqlNode> sqlNodeList = Lists.newArrayList();
52+
List<String> sqlJoinCompareOperate= Lists.newArrayList();
53+
5154
ParseUtils.parseAnd(conditionNode, sqlNodeList);
55+
ParseUtils.parseJoinCompareOperate(conditionNode, sqlJoinCompareOperate);
56+
5257

5358
for (SqlNode sqlNode : sqlNodeList) {
5459
dealOneEqualCon(sqlNode, sideTableName);
@@ -57,8 +62,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5762
sqlCondition = "select ${selectField} from ${tableName} where ";
5863
for (int i = 0; i < equalFieldList.size(); i++) {
5964
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
60-
61-
sqlCondition += dealLowerFiled(equalField) + "=? ";
65+
sqlCondition += dealLowerFiled(equalField) + " " + sqlJoinCompareOperate.get(i) + " " + " ?";
6266
if (i != equalFieldList.size() - 1) {
6367
sqlCondition += " and ";
6468
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllSideInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5656

5757
sqlCondition = "select ${selectField} from ${tableName} ";
5858
sqlCondition = sqlCondition.replace("${tableName}", rdbSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
59-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
6059
}
6160

6261
@Override

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5858
SqlNode conditionNode = joinInfo.getCondition();
5959

6060
List<SqlNode> sqlNodeList = Lists.newArrayList();
61+
62+
List<String> sqlJoinCompareOperate= Lists.newArrayList();
63+
6164
ParseUtils.parseAnd(conditionNode, sqlNodeList);
65+
ParseUtils.parseJoinCompareOperate(conditionNode, sqlJoinCompareOperate);
6266

6367
for (SqlNode sqlNode : sqlNodeList) {
6468
dealOneEqualCon(sqlNode, sideTableName);
@@ -68,21 +72,20 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6872
for (int i = 0; i < equalFieldList.size(); i++) {
6973
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
7074

71-
sqlCondition += equalField + "=? ";
75+
sqlCondition += equalField + "\t" + sqlJoinCompareOperate.get(i) + " ?";
7276
if (i != equalFieldList.size() - 1) {
7377
sqlCondition += " and ";
7478
}
7579
}
7680

7781
sqlCondition = sqlCondition.replace("${tableName}", rdbSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
78-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
7982
}
8083

8184

8285
@Override
8386
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
84-
if (sqlNode.getKind() != SqlKind.EQUALS) {
85-
throw new RuntimeException("not equal operator.");
87+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
88+
throw new RuntimeException("not compare operator.");
8689
}
8790

8891
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];

0 commit comments

Comments
 (0)