Skip to content

Commit 228634d

Browse files
committed
refresh tablet_id if the size of partIdMap is more than 1, do not directly return
1 parent 5fed0eb commit 228634d

File tree

2 files changed

+18
-14
lines changed

2 files changed

+18
-14
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@
2929
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
3030
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
3131
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
32-
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
3332
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
3433
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregation;
3534
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate;
3635
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateRequest;
3736
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateResult;
38-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObBorderFlag;
3937
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
4038
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
4139
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest;
@@ -48,7 +46,6 @@
4846
import com.alipay.remoting.util.StringUtils;
4947
import org.slf4j.Logger;
5048

51-
import java.lang.reflect.Array;
5249
import java.util.*;
5350
import java.util.concurrent.ConcurrentHashMap;
5451
import java.util.concurrent.TimeUnit;
@@ -710,15 +707,18 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
710707
if (refreshedTableMeta) {
711708
refreshedTableMeta = false;
712709
boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV;
713-
tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV);
710+
Map<Long, ObTableParam> partIdMapObTable = tableRoute.refreshTabletLocationAndGetPartIdMap(tableName, callback.getQuery().getObTableQuery(), isHKV);
711+
tableParam = partIdMapObTable.entrySet().iterator().next().getValue();
714712
} else {
715713
tableRoute.refreshPartitionLocation(tableName, routeQueryTabletId, null);
716714
}
717715
}
718-
ObTableQuery tableQuery = callback.getQuery().getObTableQuery();
719-
// using scan range
720-
tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(),
721-
tableQuery.getKeyRanges());
716+
if (tableParam == null) {
717+
ObTableQuery tableQuery = callback.getQuery().getObTableQuery();
718+
// using scan range
719+
tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(),
720+
tableQuery.getKeyRanges());
721+
}
722722
routeQueryTabletId = tableParam.getPartitionId();
723723
} else {
724724
throw new ObTableException("RowKey or scan range is null");
@@ -2317,21 +2317,24 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23172317
request.setTimeout(getOdpTable().getObTableOperationTimeout());
23182318
return getOdpTable().execute(request);
23192319
} else {
2320+
Map<Long, ObTableParam> partIdMapObTable = null;
23202321
// Recalculate partIdMapObTable
23212322
if (needRefreshTabletLocation) {
23222323
needRefreshTabletLocation = false;
23232324
if (refreshedTableMeta) {
23242325
refreshedTableMeta = false;
23252326
// need to recalculate routing tablet_id and refresh location
23262327
boolean isHKV = request.getEntityType() == ObTableEntityType.HKV;
2327-
tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV);
2328+
partIdMapObTable = tableRoute.refreshTabletLocationAndGetPartIdMap(request.getTableName(), tableQuery, isHKV);
23282329
} else {
23292330
tableRoute.refreshPartitionLocation(request.getTableName(), routeTabletId, null);
23302331
}
23312332
}
2332-
Map<Long, ObTableParam> partIdMapObTable = tableRoute.getPartIdParamMapForQuery(
2333-
request.getTableName(), tableQuery.getScanRangeColumns(),
2334-
tableQuery.getKeyRanges());
2333+
if (partIdMapObTable == null) {
2334+
partIdMapObTable = tableRoute.getPartIdParamMapForQuery(
2335+
request.getTableName(), tableQuery.getScanRangeColumns(),
2336+
tableQuery.getKeyRanges());
2337+
}
23352338

23362339
// Check if partIdMapObTable size is greater than 1
23372340
boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute();

src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ public TableEntry refreshTabletLocationBatch(String tableName) throws Exception
579579
}
580580
}
581581

582-
public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery query, boolean isHKV) throws Exception {
582+
public Map<Long, ObTableParam> refreshTabletLocationAndGetPartIdMap(String tableName, ObTableQuery query, boolean isHKV) throws Exception {
583583
Map<Long, ObTableParam> partIdParamMap = getPartIdParamMapForQuery(tableName, query.getScanRangeColumns(), query.getKeyRanges());
584584
if (isHKV) {
585585
// for HBase process, if distributed function is enabled, no need to do routing refresh
@@ -588,7 +588,7 @@ public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery q
588588
throw new ObTablePartitionConsistentException(
589589
"query and mutate must be a atomic operation");
590590
} else if (isDistributedSupported) {
591-
return;
591+
// do nothing
592592
}
593593
} else {
594594
// for table process, distributed function is not supported yet, need to refresh routing
@@ -604,6 +604,7 @@ public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery q
604604
TableEntry tableEntry = getTableEntry(tableName);
605605
long tabletId = entry.getValue().getTabletId();
606606
refreshPartitionLocation(tableName, tabletId, tableEntry);
607+
return partIdParamMap;
607608
}
608609

609610
private Long[] getTabletsFromTableEntry(TableEntry tableEntry) {

0 commit comments

Comments
 (0)