Skip to content

Commit dbfe5ee

Browse files
committed
fix redis async cache error
1 parent a40701f commit dbfe5ee

File tree

2 files changed

+8
-14
lines changed

2 files changed

+8
-14
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,7 @@
4545
import org.apache.calcite.sql.parser.SqlParseException;
4646
import org.apache.calcite.sql.parser.SqlParserPos;
4747
import org.apache.commons.collections.CollectionUtils;
48-
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
4948
import org.apache.flink.api.common.typeinfo.TypeInformation;
50-
import org.apache.flink.api.java.functions.KeySelector;
5149
import org.apache.flink.api.java.tuple.Tuple2;
5250
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5351
import com.google.common.collect.HashBasedTable;
@@ -61,11 +59,12 @@
6159
import org.apache.flink.types.Row;
6260
import org.slf4j.Logger;
6361
import org.slf4j.LoggerFactory;
64-
65-
import java.io.Serializable;
6662
import java.sql.Timestamp;
67-
import java.util.*;
68-
import java.util.stream.Collectors;
63+
import java.util.Collection;
64+
import java.util.LinkedList;
65+
import java.util.List;
66+
import java.util.Map;
67+
import java.util.Queue;
6968

7069
import static org.apache.calcite.sql.SqlKind.*;
7170

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
135135
if(openCache()){
136136
CacheObj val = getFromCache(key);
137137
if(val != null){
138-
139138
if(ECacheContentType.MissVal == val.getType()){
140139
dealMissKey(input, resultFuture);
141140
return;
142141
}else if(ECacheContentType.MultiLine == val.getType()){
143-
List<Row> rowList = Lists.newArrayList();
144-
for (Object jsonArray : (List) val.getContent()) {
145-
Row row = fillData(input, val.getContent());
146-
rowList.add(row);
147-
}
148-
resultFuture.complete(rowList);
142+
Row row = fillData(input, val.getContent());
143+
resultFuture.complete(Collections.singleton(row));
149144
}else{
150145
RuntimeException exception = new RuntimeException("not support cache obj type " + val.getType());
151146
resultFuture.completeExceptionally(exception);
@@ -171,10 +166,10 @@ public void accept(List<KeyValue<String, String>> keyValues) {
171166
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
172167
}
173168
Row row = fillData(input, keyValue);
174-
resultFuture.complete(Collections.singleton(row));
175169
if (openCache()) {
176170
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
177171
}
172+
resultFuture.complete(Collections.singleton(row));
178173
} else {
179174
dealMissKey(input, resultFuture);
180175
if (openCache()) {

0 commit comments

Comments
 (0)