Skip to content

Commit 3de8aa8

Browse files
committed
fix scan/get more result after meta refreshing
1 parent 828c1a4 commit 3de8aa8

File tree

3 files changed

+46
-4
lines changed

3 files changed

+46
-4
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.atomic.AtomicReference;
4747
import java.util.concurrent.locks.ReentrantLock;
4848

49+
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.INVALID_TABLET_ID;
4950
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;
5051

5152
public abstract class AbstractQueryStreamResult extends AbstractPayload implements
@@ -412,7 +413,7 @@ public boolean next() throws Exception {
412413
}
413414
}
414415

415-
protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
416+
protected Map<Long, ObPair<Long, ObTableParam>> buildAllPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
416417
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new LinkedHashMap<>();
417418
String indexName = tableQuery.getIndexName();
418419
if (!client.isOdpMode()) {
@@ -456,6 +457,40 @@ protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient cl
456457
return partitionObTables;
457458
}
458459

460+
protected Map<Long, ObPair<Long, ObTableParam>> buildFirstPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
461+
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new LinkedHashMap<>();
462+
String indexName = tableQuery.getIndexName();
463+
464+
if (!this.client.isOdpMode()) {
465+
indexTableName = client.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns(), false);
466+
}
467+
468+
if (tableQuery.getKeyRanges().isEmpty()) {
469+
throw new IllegalArgumentException("query ranges is empty");
470+
} else {
471+
ObNewRange range = tableQuery.getKeyRanges().get(0);
472+
ObRowKey startKey = range.getStartKey();
473+
int startKeySize = startKey.getObjs().size();
474+
Object[] start = new Object[startKeySize];
475+
476+
for (int i = 0; i < startKeySize; i++) {
477+
start[i] = startKey.getObj(i).isMinObj() || startKey.getObj(i).isMaxObj() ?
478+
startKey.getObj(i) : startKey.getObj(i).getValue();
479+
}
480+
481+
if (this.entityType == ObTableEntityType.HKV && client.isTableGroupName(tableName)) {
482+
indexTableName = client.tryGetTableNameFromTableGroupCache(tableName, false);
483+
}
484+
ObBorderFlag borderFlag = range.getBorderFlag();
485+
List<ObTableParam> params = this.client.getTableParams(indexTableName, tableQuery, start,
486+
borderFlag.isInclusiveStart(), start, borderFlag.isInclusiveEnd());
487+
488+
partitionObTables.put(INVALID_TABLET_ID, new ObPair<>(params.get(0).getPartId(), params.get(0)));
489+
}
490+
491+
return partitionObTables;
492+
}
493+
459494
protected void nextRow() {
460495
rowIndex = rowIndex + 1;
461496
row = cacheRows.poll();

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,11 @@ public boolean queryNewStreamResultInNext() throws Exception {
278278
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
279279
String tableName)
280280
throws Exception {
281-
return buildPartitions(client, tableQuery, tableName);
281+
if (isDistributeScan()) {
282+
return buildFirstPartitions(client, tableQuery, tableName);
283+
} else {
284+
return buildAllPartitions(client, tableQuery, tableName);
285+
}
282286
}
283287

284288
// This function is designed for HBase-type requests.

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alipay.oceanbase.rpc.stream;
1919

20-
import com.alipay.oceanbase.rpc.ObTableClient;
2120
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2221
import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException;
2322
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
@@ -114,7 +113,11 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
114113
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
115114
String tableName)
116115
throws Exception {
117-
return buildPartitions(client, tableQuery, tableName);
116+
if (client.getServerCapacity().isSupportDistributedExecute()) {
117+
return buildFirstPartitions(client, tableQuery, tableName);
118+
} else {
119+
return buildAllPartitions(client, tableQuery, tableName);
120+
}
118121
}
119122

120123
private boolean needTabletId(ObTableQueryRequest request) {

0 commit comments

Comments
 (0)