Skip to content

Commit d2c2ef3

Browse files
committed
Merge branch 'v1.5.0_dev_feature_redis' into 'v1.5.0_dev'
修改redis维表内连接问题 修改redis维表内连接问题 See merge request !123
2 parents 99fca76 + 4d8dbb7 commit d2c2ef3

File tree

1 file changed

+5
-9
lines changed

1 file changed

+5
-9
lines changed

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.lettuce.core.api.async.RedisStringAsyncCommands;
3232
import io.lettuce.core.cluster.RedisClusterClient;
3333
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
34+
import org.apache.calcite.sql.JoinType;
3435
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3536
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
3637
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
@@ -124,15 +125,11 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
124125
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
125126
Object equalObj = input.getField(conValIndex);
126127

127-
String value = "";
128-
129128
if(equalObj == null){
130-
resultFuture.complete(null);
131-
value = "null";
132-
} else {
133-
value = equalObj.toString();
129+
dealMissKey(input, resultFuture);
130+
return;
134131
}
135-
132+
String value = equalObj.toString();
136133
keyData.add(sideInfo.getEqualFieldList().get(i));
137134
keyData.add(value);
138135
}
@@ -165,8 +162,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
165162
List<String> value = async.keys(key + ":*").get();
166163
String[] values = value.toArray(new String[value.size()]);
167164
if (values.length == 0){
168-
Row row = fillData(input, null);
169-
resultFuture.complete(Collections.singleton(row));
165+
dealMissKey(input, resultFuture);
170166
} else {
171167
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
172168
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {

0 commit comments

Comments
 (0)