Skip to content

Commit 83c064f

Browse files
author
dapeng
committed
Merge branch 'hotfix_1.8_zy_24996' into hotfix_1.8_3.10.x_25114
2 parents ca8d08d + f1e1d28 commit 83c064f

File tree

6 files changed

+12
-11
lines changed

6 files changed

+12
-11
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,12 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
132132

133133
String leftTableName = left.getComponent(0).getSimple();
134134
String leftField = left.getComponent(1).getSimple();
135-
Map<String, String> physicalFields = sideTableInfo.getPhysicalFields();
136135

137136
String rightTableName = right.getComponent(0).getSimple();
138137
String rightField = right.getComponent(1).getSimple();
139138

140139
if(leftTableName.equalsIgnoreCase(sideTableName)){
141-
equalFieldList.add(physicalFields.get(leftField));
140+
equalFieldList.add(leftField);
142141
int equalFieldIndex = -1;
143142
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
144143
String fieldName = rowTypeInfo.getFieldNames()[i];
@@ -154,7 +153,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
154153

155154
}else if(rightTableName.equalsIgnoreCase(sideTableName)){
156155

157-
equalFieldList.add(physicalFields.get(rightField));
156+
equalFieldList.add(rightField);
158157
int equalFieldIndex = -1;
159158
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
160159
String fieldName = rowTypeInfo.getFieldNames()[i];

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ private void parseSql(SqlNode sqlNode, Map<String, SideTableInfo> sideTableMap,
101101
parseSql(unionLeft, sideTableMap, tabMapping);
102102
parseSql(unionRight, sideTableMap, tabMapping);
103103
break;
104+
default:
105+
break;
104106
}
105107
}
106108

@@ -128,10 +130,10 @@ private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predica
128130

129131
// 跳过函数
130132
if ((((SqlBasicCall) whereNode).getOperands()[0] instanceof SqlIdentifier)
131-
&& (((SqlBasicCall) whereNode).getOperands()[1].getKind() != SqlKind.OTHER_FUNCTION)) {
133+
&& (((SqlBasicCall) whereNode).getOperands()[1].getKind() == SqlKind.LITERAL)) {
132134
fillPredicateInfoToList((SqlBasicCall) whereNode, predicatesInfoList, operatorName, operatorKind, 0, 1);
133135
} else if ((((SqlBasicCall) whereNode).getOperands()[1] instanceof SqlIdentifier)
134-
&& (((SqlBasicCall) whereNode).getOperands()[0].getKind() != SqlKind.OTHER_FUNCTION)) {
136+
&& (((SqlBasicCall) whereNode).getOperands()[0].getKind() == LITERAL)) {
135137
fillPredicateInfoToList((SqlBasicCall) whereNode, predicatesInfoList, operatorName, operatorKind, 1, 0);
136138
}
137139
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void open(Configuration parameters) throws Exception {
124124

125125
@Override
126126
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
127-
CRow inputCopy = new CRow(input.row(), input.change());
127+
CRow inputCopy = new CRow(Row.copy(input.row()), input.change());
128128
Map<String, Object> refData = Maps.newHashMap();
129129
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
130130
Integer conValIndex = sideInfo.getEqualValIndex().get(i);

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ private void dealField(Matcher matcher, TableInfo tableInfo){
100100
sideTableInfo.addFieldClass(fieldClass);
101101
sideTableInfo.addFieldType(fieldType);
102102
sideTableInfo.putAliasNameRef(aliasStr, fieldName);
103+
sideTableInfo.addPhysicalMappings(aliasStr, fieldName);
103104
}
104105

105106

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ protected void init(SideInfo sideInfo) {
8888

8989
@Override
9090
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
91-
CRow copyCrow = new CRow(input.row(), input.change());
91+
CRow copyCrow = new CRow(Row.copy(input.row()), input.change());
9292
JsonArray inputParams = new JsonArray();
9393
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
9494
Object equalObj = copyCrow.row().getField(conValIndex);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
8787

8888
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
8989
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
90-
Map<String, String> physicalFields = sideTableInfo.getPhysicalFields();
9190

9291
String leftTableName = left.getComponent(0).getSimple();
9392
String leftField = left.getComponent(1).getSimple();
@@ -96,7 +95,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
9695
String rightField = right.getComponent(1).getSimple();
9796

9897
if (leftTableName.equalsIgnoreCase(sideTableName)) {
99-
equalFieldList.add(physicalFields.get(leftField));
98+
equalFieldList.add(leftField);
10099
int equalFieldIndex = -1;
101100
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
102101
String fieldName = rowTypeInfo.getFieldNames()[i];
@@ -112,7 +111,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
112111

113112
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
114113

115-
equalFieldList.add(physicalFields.get(rightField));
114+
equalFieldList.add(rightField);
116115
int equalFieldIndex = -1;
117116
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
118117
String fieldName = rowTypeInfo.getFieldNames()[i];
@@ -144,7 +143,7 @@ public String getSelectFromStatement(String tableName, List<String> selectFields
144143
.collect(Collectors.joining(", "));
145144

146145
String whereClause = conditionFields.stream()
147-
.map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
146+
.map(f -> quoteIdentifier(sideTableInfo.getPhysicalFields().getOrDefault(f, f)) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
148147
.collect(Collectors.joining(" AND "));
149148

150149
String predicateClause = predicateInfoes.stream()

0 commit comments

Comments
 (0)