Skip to content

Commit 56165d8

Browse files
committed
Merge branch 'v1.8.0_dev_bugfix_hbase' into 'v1.8.0_dev'
流表前缀扫描的方式join hbase维表时,如果扫描到了多条记录,只会输入一条记录 See merge request !119
2 parents 8649b8b + 3fe1eb7 commit 56165d8

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
8888
}
8989

9090
List<Object> cacheContent = Lists.newArrayList();
91+
List<Row> rowList = Lists.newArrayList();
9192

9293
for(List<KeyValue> oneRow : args){
9394
try {
@@ -120,8 +121,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
120121
if (openCache) {
121122
cacheContent.add(sideVal);
122123
}
123-
124-
resultFuture.complete(Collections.singleton(row));
124+
rowList.add(row);
125125
}
126126
} catch (Exception e) {
127127
resultFuture.complete(null);
@@ -130,6 +130,10 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
130130
}
131131
}
132132

133+
if (rowList.size() > 0){
134+
resultFuture.complete(rowList);
135+
}
136+
133137
if(openCache){
134138
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
135139
}

0 commit comments

Comments
 (0)