Skip to content

Commit 1d73263

Browse files
committed
modify impala side sql
1 parent ab6f516 commit 1d73263

File tree

4 files changed

+69
-98
lines changed

4 files changed

+69
-98
lines changed

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllSideInfo.java

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.stream.Collectors;
3132

3233
public class ImpalaAllSideInfo extends RdbAllSideInfo {
3334

@@ -36,46 +37,34 @@ public ImpalaAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
3637
}
3738

3839
@Override
39-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
40+
public String getAdditionalWhereClause() {
4041
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideTableInfo;
42+
return impalaSideTableInfo.isEnablePartition() ? buildPartitionCondition(impalaSideTableInfo) : "";
43+
}
4144

42-
boolean enablePartiton = impalaSideTableInfo.isEnablePartition();
43-
44-
String sqlTmp = "select ${selectField} from ${tableName} ";
45-
sqlCondition = sqlTmp.replace("${tableName}", impalaSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
46-
47-
if (enablePartiton){
48-
String whereTmp = "where ";
49-
String[] partitionfields = impalaSideTableInfo.getPartitionfields();
50-
String[] partitionFieldTypes = impalaSideTableInfo.getPartitionFieldTypes();
51-
Map<String, List> partitionVaules = impalaSideTableInfo.getPartitionValues();
52-
int fieldsSize = partitionfields.length;
53-
for (int i=0; i < fieldsSize; i++) {
54-
String fieldName = partitionfields[i];
55-
String fieldType = partitionFieldTypes[i];
56-
List values = partitionVaules.get(fieldName);
57-
String vauleAppend = getVauleAppend(fieldType, values);
58-
if (fieldsSize - 1 == i) {
59-
whereTmp = whereTmp + String.format("%s in (%s)", fieldName, vauleAppend);
60-
}else {
61-
whereTmp = whereTmp + String.format("%s in (%s) and ", fieldName, vauleAppend);
62-
}
45+
private String buildPartitionCondition(ImpalaSideTableInfo impalaSideTableInfo) {
46+
String partitionCondtion = " ";
47+
String[] partitionfields = impalaSideTableInfo.getPartitionfields();
48+
String[] partitionFieldTypes = impalaSideTableInfo.getPartitionFieldTypes();
49+
Map<String, List> partitionVaules = impalaSideTableInfo.getPartitionValues();
6350

64-
}
65-
sqlCondition = sqlCondition + whereTmp;
51+
int fieldsSize = partitionfields.length;
52+
for (int i = 0; i < fieldsSize; i++) {
53+
String fieldName = partitionfields[i];
54+
String fieldType = partitionFieldTypes[i];
55+
List values = partitionVaules.get(fieldName);
56+
String partitionVaule = getPartitionVaule(fieldType, values);
57+
partitionCondtion += String.format("AND %s IN (%s) ", fieldName, partitionVaule);
6658
}
59+
return partitionCondtion;
6760
}
6861

69-
public String getVauleAppend(String fieldType, List values) {
70-
String vauleAppend = "";
71-
for(int i=0; i < values.size(); i++) {
72-
if (fieldType.toLowerCase().equals("string") || fieldType.toLowerCase().equals("varchar")) {
73-
vauleAppend = vauleAppend + "," + "'" + values.get(i) + "'";
74-
continue;
75-
}
76-
vauleAppend = vauleAppend + "," + values.get(i).toString();
77-
}
78-
vauleAppend = vauleAppend.replaceFirst(",", "");
79-
return vauleAppend;
62+
private String getPartitionVaule(String fieldType, List values) {
63+
String partitionVaule = values.stream().map(val -> {
64+
return (fieldType.toLowerCase().equals("string") || fieldType.toLowerCase().equals("varchar")) ? "'" + val + "'" : val.toString();
65+
}).collect(Collectors.joining(" , ")).toString();
66+
67+
return partitionVaule;
8068
}
69+
8170
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncSideInfo.java

Lines changed: 23 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,11 @@
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2525
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
26-
import com.dtstack.flink.sql.util.ParseUtils;
27-
import com.google.common.collect.Lists;
28-
import org.apache.calcite.sql.SqlNode;
29-
import org.apache.calcite.sql.SqlSelect;
3026
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3127

3228
import java.util.List;
3329
import java.util.Map;
30+
import java.util.stream.Collectors;
3431

3532
/**
3633
* Date: 2019/11/12
@@ -46,68 +43,36 @@ public ImpalaAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Fiel
4643
}
4744

4845
@Override
49-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
46+
public String getAdditionalWhereClause() {
5047
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideTableInfo;
48+
return impalaSideTableInfo.isEnablePartition() ? buildPartitionCondition(impalaSideTableInfo) : "";
49+
}
5150

52-
String sideTableName = joinInfo.getSideTableName();
53-
54-
SqlNode conditionNode = joinInfo.getCondition();
55-
56-
List<SqlNode> sqlNodeList = Lists.newArrayList();
57-
58-
List<String> sqlJoinCompareOperate= Lists.newArrayList();
59-
60-
ParseUtils.parseAnd(conditionNode, sqlNodeList);
61-
ParseUtils.parseJoinCompareOperate(conditionNode, sqlJoinCompareOperate);
62-
63-
for (SqlNode sqlNode : sqlNodeList) {
64-
dealOneEqualCon(sqlNode, sideTableName);
65-
}
6651

67-
sqlCondition = "select ${selectField} from ${tableName} where ";
68-
for (int i = 0; i < equalFieldList.size(); i++) {
69-
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
52+
private String buildPartitionCondition(ImpalaSideTableInfo impalaSideTableInfo) {
53+
String partitionCondtion = " ";
54+
String[] partitionfields = impalaSideTableInfo.getPartitionfields();
55+
String[] partitionFieldTypes = impalaSideTableInfo.getPartitionFieldTypes();
56+
Map<String, List> partitionVaules = impalaSideTableInfo.getPartitionValues();
7057

71-
sqlCondition += equalField + " " + sqlJoinCompareOperate.get(i) + " ? ";
72-
if (i != equalFieldList.size() - 1) {
73-
sqlCondition += " and ";
74-
}
58+
int fieldsSize = partitionfields.length;
59+
for (int i = 0; i < fieldsSize; i++) {
60+
String fieldName = partitionfields[i];
61+
String fieldType = partitionFieldTypes[i];
62+
List values = partitionVaules.get(fieldName);
63+
String partitionVaule = getPartitionVaule(fieldType, values);
64+
partitionCondtion += String.format("AND %s IN (%s) ", fieldName, partitionVaule);
7565
}
76-
sqlCondition = sqlCondition.replace("${tableName}", impalaSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
77-
78-
boolean enablePartiton = impalaSideTableInfo.isEnablePartition();
79-
if (enablePartiton){
80-
String whereTmp = " ";
81-
String[] partitionfields = impalaSideTableInfo.getPartitionfields();
82-
String[] partitionFieldTypes = impalaSideTableInfo.getPartitionFieldTypes();
83-
Map<String, List> partitionVaules = impalaSideTableInfo.getPartitionValues();
84-
int fieldsSize = partitionfields.length;
85-
for (int i=0; i < fieldsSize; i++) {
86-
String fieldName = partitionfields[i];
87-
String fieldType = partitionFieldTypes[i];
88-
List values = partitionVaules.get(fieldName);
89-
String vauleAppend = getVauleAppend(fieldType, values);
90-
whereTmp = whereTmp + String.format("and %s in (%s) ", fieldName, vauleAppend);
66+
return partitionCondtion;
67+
}
9168

92-
}
93-
sqlCondition = sqlCondition + whereTmp;
94-
}
9569

96-
System.out.println("--------side sql query:-------------------");
97-
System.out.println(sqlCondition);
98-
}
70+
private String getPartitionVaule(String fieldType, List values) {
71+
String partitionVaule = values.stream().map(val -> {
72+
return (fieldType.toLowerCase().equals("string") || fieldType.toLowerCase().equals("varchar")) ? "'" + val + "'" : val.toString();
73+
}).collect(Collectors.joining(" , ")).toString();
9974

100-
public String getVauleAppend(String fieldType, List values) {
101-
String vauleAppend = "";
102-
for(int i=0; i < values.size(); i++) {
103-
if (fieldType.toLowerCase().equals("string") || fieldType.toLowerCase().equals("varchar")) {
104-
vauleAppend = vauleAppend + "," + "'" + values.get(i) + "'";
105-
continue;
106-
}
107-
vauleAppend = vauleAppend + "," + values.get(i).toString();
108-
}
109-
vauleAppend = vauleAppend.replaceFirst(",", "");
110-
return vauleAppend;
75+
return partitionVaule;
11176
}
11277

11378
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllSideInfo.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.dtstack.flink.sql.util.ParseUtils;
2828
import org.apache.calcite.sql.SqlNode;
2929
import org.apache.commons.collections.CollectionUtils;
30+
import org.apache.commons.lang3.StringUtils;
3031
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3132
import com.google.common.collect.Lists;
3233

@@ -54,18 +55,29 @@ public RdbAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5455
@Override
5556
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5657
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideTableInfo;
57-
5858
sqlCondition = getSelectFromStatement(getTableName(rdbSideTableInfo), Arrays.asList(sideSelectFields.split(",")), sideTableInfo.getPredicateInfoes());
59-
System.out.println("--------side sql query-------\n" + sqlCondition);
59+
System.out.println("-------- all side sql query-------\n" + sqlCondition);
60+
}
61+
62+
public String getAdditionalWhereClause() {
63+
return "";
6064
}
6165

62-
public String getSelectFromStatement(String tableName, List<String> selectFields, List<PredicateInfo> predicateInfoes) {
66+
private String getSelectFromStatement(String tableName, List<String> selectFields, List<PredicateInfo> predicateInfoes) {
6367
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
6468
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
65-
String sql = "SELECT " + fromClause + " FROM " + tableName + (predicateInfoes.size() > 0 ? " WHERE " + predicateClause : "");
69+
String whereClause = buildWhereClause(predicateClause);
70+
String sql = "SELECT " + fromClause + " FROM " + tableName + whereClause;
6671
return sql;
6772
}
6873

74+
private String buildWhereClause(String predicateClause) {
75+
String additionalWhereClause = getAdditionalWhereClause();
76+
String whereClause = (!StringUtils.isEmpty(predicateClause) || !StringUtils.isEmpty(additionalWhereClause) ? " WHERE " + predicateClause : "");
77+
whereClause += (StringUtils.isEmpty(predicateClause)) ? additionalWhereClause.replaceFirst("AND", "") : additionalWhereClause;
78+
return whereClause;
79+
}
80+
6981
@Override
7082
public void parseSelectFields(JoinInfo joinInfo) {
7183
String sideTableName = joinInfo.getSideTableName();

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
129129

130130
}
131131

132+
public String getAdditionalWhereClause() {
133+
return "";
134+
}
135+
136+
132137
public String getSelectFromStatement(String tableName, List<String> selectFields, List<String> conditionFields, List<String> sqlJoinCompareOperate,
133138
List<PredicateInfo> predicateInfoes) {
134139
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
@@ -137,7 +142,7 @@ public String getSelectFromStatement(String tableName, List<String> selectFields
137142
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
138143

139144
String sql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
140-
+ (predicateInfoes.size() > 0 ? " AND " + predicateClause : "");
145+
+ (predicateInfoes.size() > 0 ? " AND " + predicateClause : "") + getAdditionalWhereClause();
141146
return sql;
142147
}
143148

0 commit comments

Comments
 (0)