Skip to content

Commit 73ed644

Browse files
author
dapeng
committed
乱序,别名和hase
1 parent 2b0af47 commit 73ed644

File tree

4 files changed

+7
-8
lines changed

4 files changed

+7
-8
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,12 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
132132

133133
String leftTableName = left.getComponent(0).getSimple();
134134
String leftField = left.getComponent(1).getSimple();
135-
Map<String, String> physicalFields = sideTableInfo.getPhysicalFields();
136135

137136
String rightTableName = right.getComponent(0).getSimple();
138137
String rightField = right.getComponent(1).getSimple();
139138

140139
if(leftTableName.equalsIgnoreCase(sideTableName)){
141-
equalFieldList.add(physicalFields.get(leftField));
140+
equalFieldList.add(leftField);
142141
int equalFieldIndex = -1;
143142
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
144143
String fieldName = rowTypeInfo.getFieldNames()[i];
@@ -154,7 +153,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
154153

155154
}else if(rightTableName.equalsIgnoreCase(sideTableName)){
156155

157-
equalFieldList.add(physicalFields.get(rightField));
156+
equalFieldList.add(rightField);
158157
int equalFieldIndex = -1;
159158
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
160159
String fieldName = rowTypeInfo.getFieldNames()[i];

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void open(Configuration parameters) throws Exception {
124124

125125
@Override
126126
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
127-
CRow inputCopy = new CRow(input.row(), input.change());
127+
CRow inputCopy = new CRow(Row.copy(input.row()), input.change());
128128
Map<String, Object> refData = Maps.newHashMap();
129129
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
130130
Integer conValIndex = sideInfo.getEqualValIndex().get(i);

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ private void dealField(Matcher matcher, TableInfo tableInfo){
100100
sideTableInfo.addFieldClass(fieldClass);
101101
sideTableInfo.addFieldType(fieldType);
102102
sideTableInfo.putAliasNameRef(aliasStr, fieldName);
103+
sideTableInfo.addPhysicalMappings(aliasStr, fieldName);
103104
}
104105

105106

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
8787

8888
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
8989
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
90-
Map<String, String> physicalFields = sideTableInfo.getPhysicalFields();
9190

9291
String leftTableName = left.getComponent(0).getSimple();
9392
String leftField = left.getComponent(1).getSimple();
@@ -96,7 +95,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
9695
String rightField = right.getComponent(1).getSimple();
9796

9897
if (leftTableName.equalsIgnoreCase(sideTableName)) {
99-
equalFieldList.add(physicalFields.get(leftField));
98+
equalFieldList.add(leftField);
10099
int equalFieldIndex = -1;
101100
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
102101
String fieldName = rowTypeInfo.getFieldNames()[i];
@@ -112,7 +111,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
112111

113112
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
114113

115-
equalFieldList.add(physicalFields.get(rightField));
114+
equalFieldList.add(rightField);
116115
int equalFieldIndex = -1;
117116
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
118117
String fieldName = rowTypeInfo.getFieldNames()[i];
@@ -144,7 +143,7 @@ public String getSelectFromStatement(String tableName, List<String> selectFields
144143
.collect(Collectors.joining(", "));
145144

146145
String whereClause = conditionFields.stream()
147-
.map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
146+
.map(f -> quoteIdentifier(sideTableInfo.getPhysicalFields().getOrDefault(f, f)) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
148147
.collect(Collectors.joining(" AND "));
149148

150149
String predicateClause = predicateInfoes.stream()

0 commit comments

Comments
 (0)