Skip to content

Commit 204cc39

Browse files
committed
sidetable primary key convert
1 parent 72a294e commit 204cc39

File tree

4 files changed

+18
-12
lines changed

4 files changed

+18
-12
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void parseSelectFields(JoinInfo joinInfo){
8989
for( int i=0; i<outFieldInfoList.size(); i++){
9090
FieldInfo fieldInfo = outFieldInfoList.get(i);
9191
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
92-
fields.add(fieldInfo.getFieldName());
92+
fields.add(sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName()));
9393
sideFieldIndex.put(i, sideIndex);
9494
sideFieldNameIndex.put(i, fieldInfo.getFieldName());
9595
sideIndex++;

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -463,17 +463,23 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
463463
* Analyzing conditions are very join the dimension tables include all equivalent conditions (i.e., dimension table is the primary key definition
464464
* @return
465465
*/
466-
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, List<String> primaryKeys){
467-
468-
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias);
469-
if(CollectionUtils.isEqualCollection(conditionFields, primaryKeys)){
466+
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, SideTableInfo sideTableInfo){
467+
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
468+
if(CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))){
470469
return true;
471470
}
472-
473471
return false;
474472
}
475473

476-
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName){
474+
private List<String> convertPrimaryAlias(SideTableInfo sideTableInfo){
475+
List<String> res = Lists.newArrayList();
476+
sideTableInfo.getPrimaryKeys().forEach(field -> {
477+
res.add(sideTableInfo.getPhysicalFields().getOrDefault(field, field));
478+
});
479+
return res;
480+
}
481+
482+
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName, SideTableInfo sideTableInfo){
477483
List<SqlNode> sqlNodeList = Lists.newArrayList();
478484
ParseUtils.parseAnd(conditionNode, sqlNodeList);
479485
List<String> conditionFields = Lists.newArrayList();
@@ -496,7 +502,7 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
496502
}else{
497503
throw new RuntimeException(String.format("side table:%s join condition is wrong", specifyTableName));
498504
}
499-
505+
tableCol = sideTableInfo.getPhysicalFields().getOrDefault(tableCol, tableCol);
500506
conditionFields.add(tableCol);
501507
}
502508

@@ -590,7 +596,7 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
590596
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
591597
}
592598

593-
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo.getPrimaryKeys())){
599+
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
594600
throw new RuntimeException("ON condition must contain all equal fields!!!");
595601
}
596602

@@ -616,7 +622,7 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
616622

617623
//join side table before keyby ===> Reducing the size of each dimension table cache of async
618624
if(sideTableInfo.isPartitionedJoin()){
619-
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias());
625+
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
620626
String[] leftJoinColArr = new String[leftJoinColList.size()];
621627
leftJoinColArr = leftJoinColList.toArray(leftJoinColArr);
622628
adaptStream = adaptStream.keyBy(leftJoinColArr);

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5656

5757
sqlCondition = "select ${selectField} from ${tableName} where ";
5858
for (int i = 0; i < equalFieldList.size(); i++) {
59-
String equalField = equalFieldList.get(i);
59+
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
6060

6161
sqlCondition += dealLowerFiled(equalField) + "=? ";
6262
if (i != equalFieldList.size() - 1) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6666

6767
sqlCondition = "select ${selectField} from ${tableName} where ";
6868
for (int i = 0; i < equalFieldList.size(); i++) {
69-
String equalField = equalFieldList.get(i);
69+
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
7070

7171
sqlCondition += equalField + "=? ";
7272
if (i != equalFieldList.size() - 1) {

0 commit comments

Comments
 (0)