Skip to content

Commit 05f10bc

Browse files
committed
fix asyn rdb write only one record
1 parent a7000cd commit 05f10bc

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
9090
return;
9191
} else if (ECacheContentType.MultiLine == val.getType()) {
9292

93+
List<Row> rowList = Lists.newArrayList();
9394
for (Object jsonArray : (List) val.getContent()) {
9495
Row row = fillData(input, jsonArray);
95-
resultFuture.complete(Collections.singleton(row));
96+
rowList.add(row);
9697
}
9798

99+
resultFuture.complete(rowList);
100+
98101
} else {
99102
throw new RuntimeException("not support cache obj type " + val.getType());
100103
}
@@ -122,18 +125,18 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
122125

123126
int resultSize = rs.result().getResults().size();
124127
if (resultSize > 0) {
128+
List<Row> rowList = Lists.newArrayList();
125129
for (JsonArray line : rs.result().getResults()) {
126-
127130
Row row = fillData(input, line);
128131
if (openCache()) {
129132
cacheContent.add(line);
130133
}
131-
resultFuture.complete(Collections.singleton(row));
134+
rowList.add(row);
132135
}
133-
134136
if (openCache()) {
135137
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
136138
}
139+
resultFuture.complete(rowList);
137140
} else {
138141
dealMissKey(input, resultFuture);
139142
if (openCache()) {

0 commit comments

Comments
 (0)