Skip to content

Commit 4d8dbb7

Browse files
修竹修竹
authored andcommitted
解决redis async side内连接问题
1 parent 1703b02 commit 4d8dbb7

File tree

1 file changed

+4
-12
lines changed

1 file changed

+4
-12
lines changed

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,11 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
125125
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
126126
Object equalObj = input.getField(conValIndex);
127127

128-
String value = "";
129-
130128
if(equalObj == null){
131-
resultFuture.complete(null);
132-
value = "null";
133-
} else {
134-
value = equalObj.toString();
129+
dealMissKey(input, resultFuture);
130+
return;
135131
}
136-
132+
String value = equalObj.toString();
137133
keyData.add(sideInfo.getEqualFieldList().get(i));
138134
keyData.add(value);
139135
}
@@ -166,11 +162,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
166162
List<String> value = async.keys(key + ":*").get();
167163
String[] values = value.toArray(new String[value.size()]);
168164
if (values.length == 0){
169-
if (sideInfo.getJoinType() != JoinType.LEFT) {
170-
return;
171-
}
172-
Row row = fillData(input, null);
173-
resultFuture.complete(Collections.singleton(row));
165+
dealMissKey(input, resultFuture);
174166
} else {
175167
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
176168
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {

0 commit comments

Comments
 (0)