Skip to content

Commit 075064a

Browse files
committed
fix rdb asynside bug
1 parent ea8cee4 commit 075064a

File tree

1 file changed

+5
-6
lines changed

1 file changed

+5
-6
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,15 @@
2626
import io.vertx.core.json.JsonArray;
2727
import io.vertx.ext.sql.SQLClient;
2828
import io.vertx.ext.sql.SQLConnection;
29-
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
3029
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3130
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
32-
import org.apache.flink.configuration.Configuration;
3331
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3432
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3533
import org.apache.flink.types.Row;
3634
import org.slf4j.Logger;
3735
import org.slf4j.LoggerFactory;
3836

39-
import java.math.BigInteger;
4037
import java.sql.Timestamp;
41-
import java.util.Collections;
4238
import java.util.List;
4339
import java.util.Map;
4440

@@ -122,18 +118,21 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
122118

123119
int resultSize = rs.result().getResults().size();
124120
if (resultSize > 0) {
125-
for (JsonArray line : rs.result().getResults()) {
121+
List<Row> rowList = Lists.newArrayList();
126122

123+
for (JsonArray line : rs.result().getResults()) {
127124
Row row = fillData(input, line);
128125
if (openCache()) {
129126
cacheContent.add(line);
130127
}
131-
resultFuture.complete(Collections.singleton(row));
128+
rowList.add(row);
132129
}
133130

134131
if (openCache()) {
135132
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
136133
}
134+
135+
resultFuture.complete(rowList);
137136
} else {
138137
dealMissKey(input, resultFuture);
139138
if (openCache()) {

0 commit comments

Comments
 (0)