Skip to content

Commit 282b4d0

Browse files
committed
[异步维表支持非等值连接]
1 parent fc035e7 commit 282b4d0

File tree

6 files changed

+43
-9
lines changed

6 files changed

+43
-9
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ public void parseSelectFields(JoinInfo joinInfo){
109109
}
110110

111111
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
112-
if(sqlNode.getKind() != SqlKind.EQUALS){
113-
throw new RuntimeException("not equal operator.");
112+
if(!SqlKind.COMPARISON.contains(sqlNode.getKind())){
113+
throw new RuntimeException("not compare operator.");
114114
}
115115

116116
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,8 +585,8 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
585585
ParseUtils.parseAnd(conditionNode, sqlNodeList);
586586
List<String> conditionFields = Lists.newArrayList();
587587
for(SqlNode sqlNode : sqlNodeList){
588-
if(sqlNode.getKind() != SqlKind.EQUALS){
589-
throw new RuntimeException("not equal operator.");
588+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
589+
throw new RuntimeException("not compare operator.");
590590
}
591591

592592
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import org.apache.calcite.sql.SqlBasicCall;
44
import org.apache.calcite.sql.SqlKind;
55
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.commons.lang3.StringUtils;
67

8+
import java.util.HashSet;
79
import java.util.List;
10+
import java.util.Set;
811

912
/**
1013
* @Auther: jiangjunjie
@@ -20,4 +23,25 @@ public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
2023
sqlNodeList.add(conditionNode);
2124
}
2225
}
26+
27+
public static void parseJoinCompareOperate(SqlNode condition, List<String> sqlJoinCompareOperate) {
28+
SqlBasicCall joinCondition = (SqlBasicCall) condition;
29+
if (joinCondition.getKind() == SqlKind.AND) {
30+
List<SqlNode> operandList = joinCondition.getOperandList();
31+
for (SqlNode sqlNode : operandList) {
32+
parseJoinCompareOperate(sqlNode, sqlJoinCompareOperate);
33+
}
34+
} else {
35+
String operator = parseOperator(joinCondition.getKind());
36+
sqlJoinCompareOperate.add(operator);
37+
}
38+
}
39+
40+
public static String parseOperator(SqlKind sqlKind) {
41+
if (StringUtils.equalsIgnoreCase(sqlKind.toString(), "NOT_EQUALS")){
42+
return "!=";
43+
}
44+
return sqlKind.sql;
45+
}
46+
2347
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
27+
import org.apache.calcite.sql.SqlKind;
2728
import org.apache.calcite.sql.SqlNode;
2829
import org.apache.commons.lang3.StringUtils;
2930
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -48,7 +49,11 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
4849
SqlNode conditionNode = joinInfo.getCondition();
4950

5051
List<SqlNode> sqlNodeList = Lists.newArrayList();
52+
List<String> sqlJoinCompareOperate= Lists.newArrayList();
53+
5154
ParseUtils.parseAnd(conditionNode, sqlNodeList);
55+
ParseUtils.parseJoinCompareOperate(conditionNode, sqlJoinCompareOperate);
56+
5257

5358
for (SqlNode sqlNode : sqlNodeList) {
5459
dealOneEqualCon(sqlNode, sideTableName);
@@ -57,8 +62,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5762
sqlCondition = "select ${selectField} from ${tableName} where ";
5863
for (int i = 0; i < equalFieldList.size(); i++) {
5964
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
60-
61-
sqlCondition += dealLowerFiled(equalField) + "=? ";
65+
sqlCondition += dealLowerFiled(equalField) + " " + sqlJoinCompareOperate.get(i) + " " + " ?";
6266
if (i != equalFieldList.size() - 1) {
6367
sqlCondition += " and ";
6468
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ private void buildInsertSql(String tableName, List<String> fields) {
7171
placeholder = placeholder.replaceFirst(",", "");
7272
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
7373
this.sql = sqlTmp;
74+
System.out.println("--------insert sqlTmp--------");
75+
System.out.println(sql);
7476
}
7577

7678
@Override

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5858
SqlNode conditionNode = joinInfo.getCondition();
5959

6060
List<SqlNode> sqlNodeList = Lists.newArrayList();
61+
62+
List<String> sqlJoinCompareOperate= Lists.newArrayList();
63+
6164
ParseUtils.parseAnd(conditionNode, sqlNodeList);
65+
ParseUtils.parseJoinCompareOperate(conditionNode, sqlJoinCompareOperate);
6266

6367
for (SqlNode sqlNode : sqlNodeList) {
6468
dealOneEqualCon(sqlNode, sideTableName);
@@ -68,7 +72,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6872
for (int i = 0; i < equalFieldList.size(); i++) {
6973
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
7074

71-
sqlCondition += equalField + "=? ";
75+
sqlCondition += equalField + "\t" + sqlJoinCompareOperate.get(i) + " ?";
7276
if (i != equalFieldList.size() - 1) {
7377
sqlCondition += " and ";
7478
}
@@ -81,8 +85,8 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
8185

8286
@Override
8387
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
84-
if (sqlNode.getKind() != SqlKind.EQUALS) {
85-
throw new RuntimeException("not equal operator.");
88+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
89+
throw new RuntimeException("not compare operator.");
8690
}
8791

8892
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];

0 commit comments

Comments
 (0)