Skip to content

Commit ab6f516

Browse files
committed
all cache predicate pushdown
1 parent 899ae7b commit ab6f516

File tree

3 files changed

+43
-22
lines changed

3 files changed

+43
-22
lines changed

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class StreamSideFactory {
3939

4040
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
4141

42-
String sideOperator = ECacheType.ALL.name().equals(cacheType) ? "all" : "async";
42+
String sideOperator = ECacheType.ALL.name().equalsIgnoreCase(cacheType) ? "all" : "async";
4343
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
4444
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
4545

oracle/oracle-side/oracle-all-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAllSideInfo.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,12 @@ public OracleAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
3535
}
3636

3737
@Override
38-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
39-
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideTableInfo;
40-
41-
sqlCondition = "select ${selectField} from ${tableName} ";
42-
43-
44-
sqlCondition = sqlCondition.replace("${tableName}", DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
45-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
38+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
39+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
4640
}
4741

48-
49-
private String dealLowerSelectFiled(String fieldsStr) {
50-
StringBuilder sb = new StringBuilder();
51-
String[] fields = fieldsStr.split(",");
52-
53-
for(String f : fields) {
54-
sb.append("\"").append(f).append("\"").append(",");
55-
}
56-
57-
sb.deleteCharAt(sb.lastIndexOf(","));
58-
return sb.toString();
42+
@Override
43+
public String quoteIdentifier(String identifier) {
44+
return "\"" + identifier + "\"";
5945
}
6046
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllSideInfo.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23+
import com.dtstack.flink.sql.side.PredicateInfo;
2324
import com.dtstack.flink.sql.side.SideInfo;
2425
import com.dtstack.flink.sql.side.SideTableInfo;
2526
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
@@ -29,7 +30,9 @@
2930
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3031
import com.google.common.collect.Lists;
3132

33+
import java.util.Arrays;
3234
import java.util.List;
35+
import java.util.stream.Collectors;
3336

3437
/**
3538
* Reason:
@@ -52,8 +55,15 @@ public RdbAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5255
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5356
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideTableInfo;
5457

55-
sqlCondition = "select ${selectField} from ${tableName} ";
56-
sqlCondition = sqlCondition.replace("${tableName}", rdbSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
58+
sqlCondition = getSelectFromStatement(getTableName(rdbSideTableInfo), Arrays.asList(sideSelectFields.split(",")), sideTableInfo.getPredicateInfoes());
59+
System.out.println("--------side sql query-------\n" + sqlCondition);
60+
}
61+
62+
public String getSelectFromStatement(String tableName, List<String> selectFields, List<PredicateInfo> predicateInfoes) {
63+
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
64+
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
65+
String sql = "SELECT " + fromClause + " FROM " + tableName + (predicateInfoes.size() > 0 ? " WHERE " + predicateClause : "");
66+
return sql;
5767
}
5868

5969
@Override
@@ -107,4 +117,29 @@ public void parseSelectFields(JoinInfo joinInfo) {
107117

108118
sideSelectFields = String.join(",", fields);
109119
}
120+
121+
public String buildFilterCondition(PredicateInfo info) {
122+
switch (info.getOperatorKind()) {
123+
case "IN":
124+
case "NOT_IN":
125+
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName() + " ( " + info.getCondition() + " )";
126+
case "NOT_EQUALS":
127+
return quoteIdentifier(info.getFieldName()) + " != " + info.getCondition();
128+
case "BETWEEN":
129+
return quoteIdentifier(info.getFieldName()) + " BETWEEN " + info.getCondition();
130+
case "IS_NOT_NULL":
131+
case "IS_NULL":
132+
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName();
133+
default:
134+
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName() + " " + info.getCondition();
135+
}
136+
}
137+
138+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
139+
return rdbSideTableInfo.getTableName();
140+
}
141+
142+
public String quoteIdentifier(String identifier) {
143+
return " " + identifier + " ";
144+
}
110145
}

0 commit comments

Comments
 (0)