Skip to content

Commit f89dd6c

Browse files
committed
[Fix] hbase multi partition scan return repetitive data
1 parent 3790dba commit f89dd6c

File tree

6 files changed

+80
-44
lines changed

6 files changed

+80
-44
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,8 @@ public TableQuery setSearchText(String searchText) {
270270
tableClientQuery.setSearchText(searchText);
271271
return this;
272272
}
273+
274+
public void setAllowDistributeScan(boolean allowDistributeScan) {
275+
tableClientQuery.setAllowDistributeScan(allowDistributeScan);
276+
}
273277
}

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3777,7 +3777,9 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
37773777
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
37783778
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
37793779
tableQuery.setEntityType(request.getEntityType());
3780-
return new ObClusterTableQuery(tableQuery).asyncExecuteInternal();
3780+
ObClusterTableQuery clusterTableQuery = new ObClusterTableQuery(tableQuery);
3781+
clusterTableQuery.setAllowDistributeScan(((ObTableQueryAsyncRequest) request).isAllowDistributeScan());
3782+
return clusterTableQuery.asyncExecuteInternal();
37813783
} else if (request instanceof ObTableBatchOperationRequest) {
37823784
ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl(
37833785
request.getTableName(),

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObTableQueryAsyncRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class ObTableQueryAsyncRequest extends ObTableAbstractOperationRequest {
4040
private long querySessionId;
4141
private ObQueryOperationType queryType = ObQueryOperationType.QUERY_START;
4242

43+
private boolean allowDistributeScan;
44+
4345
/**
4446
* Get pcode.
4547
*/
@@ -124,4 +126,12 @@ public void setObTableQueryRequest(ObTableQueryRequest obTableQueryRequest) {
124126
this.obTableQueryRequest = obTableQueryRequest;
125127
}
126128

129+
public void setAllowDistributeScan(boolean allowDistributeScan) {
130+
this.allowDistributeScan = allowDistributeScan;
131+
}
132+
133+
public boolean isAllowDistributeScan() {
134+
return allowDistributeScan;
135+
}
136+
127137
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@
4040
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;
4141

4242
public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult {
43-
private static final Logger logger = LoggerFactory
44-
.getLogger(ObTableClientQueryStreamResult.class);
45-
private boolean isEnd = true;
46-
private long sessionId = Constants.OB_INVALID_ID;
47-
private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest();
48-
private ObTableConnection prevConnection = null;
43+
private static final Logger logger = LoggerFactory
44+
.getLogger(ObTableClientQueryStreamResult.class);
45+
private boolean isEnd = true;
46+
private long sessionId = Constants.OB_INVALID_ID;
47+
private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest();
48+
private ObTableConnection prevConnection = null;
49+
private boolean allowDistributeScan = true; // false when partition scan
4950

5051
@Override
5152
public void init() throws Exception {
@@ -113,8 +114,7 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
113114
throws Exception {
114115
ObTableParam obTableParam = partIdWithObTable.getRight();
115116
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
116-
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
117-
: obTableParam.getPartitionId();
117+
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
118118
// refresh request info
119119
queryRequest.setPartitionId(partitionId);
120120
queryRequest.setTableId(obTableParam.getTableId());
@@ -141,8 +141,7 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair<Long, ObTablePa
141141
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
142142

143143
// refresh request info
144-
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
145-
: obTableParam.getPartitionId();
144+
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
146145
queryRequest.setPartitionId(partitionId);
147146
queryRequest.setTableId(obTableParam.getTableId());
148147

@@ -162,8 +161,7 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
162161
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
163162

164163
// refresh request info
165-
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
166-
: obTableParam.getPartitionId();
164+
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
167165
queryRequest.setPartitionId(partitionId);
168166
queryRequest.setTableId(obTableParam.getTableId());
169167

@@ -200,8 +198,8 @@ public void renewLease() throws Exception {
200198
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
201199

202200
// refresh request info
203-
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
204-
: obTableParam.getPartitionId();
201+
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam
202+
.getPartitionId();
205203
queryRequest.setPartitionId(partitionId);
206204
queryRequest.setTableId(obTableParam.getTableId());
207205

@@ -404,4 +402,12 @@ public boolean isEnd() {
404402
public void setEnd(boolean end) {
405403
isEnd = end;
406404
}
405+
406+
private boolean isDistributeScan() {
407+
return allowDistributeScan && client.getServerCapacity().isSupportDistributedExecute();
408+
}
409+
410+
public void setAllowDistributeScan(boolean allowDistributeScan) {
411+
this.allowDistributeScan = allowDistributeScan;
412+
}
407413
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public class ObTableClientQueryImpl extends AbstractTableQueryImpl {
4848
private final ObTableClient obTableClient;
4949
private Map<Long, ObPair<Long, ObTableParam>> partitionObTables;
5050

51-
private Row rowKey; // only used by BatchOperation
51+
private Row rowKey; // only used by BatchOperation
52+
53+
private boolean allowDistributeScan;
5254

5355
/*
5456
* Add aggregation.
@@ -286,6 +288,7 @@ public ObTableClientQueryAsyncStreamResult asyncExecuteInternal() throws Excepti
286288
@Override
287289
ObTableClientQueryAsyncStreamResult execute() throws Exception {
288290
ObTableClientQueryAsyncStreamResult obTableClientQueryAsyncStreamResult = new ObTableClientQueryAsyncStreamResult();
291+
obTableClientQueryAsyncStreamResult.setAllowDistributeScan(allowDistributeScan);
289292
setCommonParams2Result(obTableClientQueryAsyncStreamResult);
290293
obTableClientQueryAsyncStreamResult.setClient(obTableClient);
291294
obTableClientQueryAsyncStreamResult.init();
@@ -427,4 +430,8 @@ public void setPartId(Long partId) {
427430
public Long getPartId() {
428431
return getObTableQuery().getPartId();
429432
}
433+
434+
public void setAllowDistributeScan(boolean allowDistributeScan) {
435+
this.allowDistributeScan = allowDistributeScan;
436+
}
430437
}

src/test/java/com/alipay/oceanbase/rpc/ObTableDatetimeTest.java

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
118
package com.alipay.oceanbase.rpc;
219

320
import com.alipay.oceanbase.rpc.exception.ObTableException;
@@ -58,13 +75,9 @@ public void testSingle() throws Exception {
5875
Date date = sdf.parse(dateString);
5976
try {
6077
Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L));
61-
client.insertOrUpdate(tableName).setRowKey(rk)
62-
.addMutateColVal(colVal("c4", "c4_val"))
63-
.execute();
64-
Map<String, Object> res = client.get(tableName)
65-
.setRowKey(rk)
66-
.select("c4")
67-
.execute();
78+
client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val"))
79+
.execute();
80+
Map<String, Object> res = client.get(tableName).setRowKey(rk).select("c4").execute();
6881
Assert.assertEquals("c4_val", res.get("c4"));
6982

7083
client.delete(tableName).setRowKey(rk).execute();
@@ -86,9 +99,9 @@ public void testBatch() throws Exception {
8699
Row rk2 = row(colVal("c1", 1L), colVal("c2", date2), colVal("c3", 1L));
87100
BatchOperation batch = client.batchOperation(tableName);
88101
InsertOrUpdate insUp1 = client.insertOrUpdate(tableName).setRowKey(rk1)
89-
.addMutateColVal(colVal("c4", "c4_val"));
102+
.addMutateColVal(colVal("c4", "c4_val"));
90103
InsertOrUpdate insUp2 = client.insertOrUpdate(tableName).setRowKey(rk2)
91-
.addMutateColVal(colVal("c4", "c4_val"));
104+
.addMutateColVal(colVal("c4", "c4_val"));
92105
batch.addOperation(insUp1, insUp2);
93106
BatchOperationResult res = batch.execute();
94107
Assert.assertNotNull(res);
@@ -110,15 +123,13 @@ public void testPkQuery() throws Exception {
110123
Date date = sdf.parse(dateString);
111124
try {
112125
Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L));
113-
client.insertOrUpdate(tableName).setRowKey(rk)
114-
.addMutateColVal(colVal("c4", "c4_val"))
115-
.execute();
116-
117-
QueryResultSet res = client.query(tableName)
118-
.select("c4")
119-
.setScanRangeColumns("c1", "c2", "c3")
120-
.addScanRange(new Object[]{1L, date, 1L}, new Object[]{1L, date, 1L})
121-
.execute();
126+
client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val"))
127+
.execute();
128+
129+
QueryResultSet res = client.query(tableName).select("c4")
130+
.setScanRangeColumns("c1", "c2", "c3")
131+
.addScanRange(new Object[] { 1L, date, 1L }, new Object[] { 1L, date, 1L })
132+
.execute();
122133
Assert.assertTrue(res.next());
123134
Assert.assertEquals("c4_val", res.getRow().get("c4"));
124135

@@ -136,16 +147,12 @@ public void testIndexQuery() throws Exception {
136147
Date date = sdf.parse(dateString);
137148
try {
138149
Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L));
139-
client.insertOrUpdate(tableName).setRowKey(rk)
140-
.addMutateColVal(colVal("c4", "c4_val"))
141-
.execute();
142-
143-
QueryResultSet res = client.query(tableName)
144-
.indexName("idx_c2")
145-
.select("c4")
146-
.setScanRangeColumns("c1", "c2")
147-
.addScanRange(new Object[]{1L, date}, new Object[]{1L, date})
148-
.execute();
150+
client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val"))
151+
.execute();
152+
153+
QueryResultSet res = client.query(tableName).indexName("idx_c2").select("c4")
154+
.setScanRangeColumns("c1", "c2")
155+
.addScanRange(new Object[] { 1L, date }, new Object[] { 1L, date }).execute();
149156
Assert.assertTrue(res.next());
150157
Assert.assertEquals("c4_val", res.getRow().get("c4"));
151158

0 commit comments

Comments
 (0)