Skip to content

Commit 1232cb7

Browse files
committed
refresh tablet location for atomic query
1 parent 05900c8 commit 1232cb7

File tree

2 files changed

+93
-114
lines changed

2 files changed

+93
-114
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
477477
tableParam = new ObTableParam(odpTable);
478478
} else {
479479
if (tryTimes > 1 && needRefreshPartitionLocation) {
480+
needRefreshPartitionLocation = false;
480481
// refresh partition location
481482
TableEntry entry = tableRoute.getTableEntry(tableName);
482483
long partId = tableRoute.getPartId(entry, rowKey);
@@ -687,6 +688,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
687688
} else {
688689
if (null != callback.getRowKey()) {
689690
if (tryTimes > 1 && needRefreshPartitionLocation) {
691+
needRefreshPartitionLocation = false;
690692
// refresh partition location
691693
TableEntry entry = tableRoute.getTableEntry(tableName);
692694
long partId = tableRoute.getPartId(entry, callback.getRowKey());
@@ -696,6 +698,11 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
696698
// using row key
697699
tableParam = tableRoute.getTableParamWithRoute(tableName, callback.getRowKey(), route);
698700
} else if (null != callback.getQuery()) {
701+
if (tryTimes > 1 && needRefreshPartitionLocation) {
702+
needRefreshPartitionLocation = false;
703+
boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV;
704+
tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV);
705+
}
699706
ObTableQuery tableQuery = callback.getQuery().getObTableQuery();
700707
// using scan range
701708
tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(),
@@ -2260,8 +2267,8 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
22602267
return getOdpTable().execute(request);
22612268
} else {
22622269
int tryTimes = 0;
2270+
boolean needRefreshTabletLocation = false;
22632271
long startExecute = System.currentTimeMillis();
2264-
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
22652272
while (true) {
22662273
long currentExecute = System.currentTimeMillis();
22672274
long costMillis = currentExecute - startExecute;
@@ -2278,40 +2285,14 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
22782285
}
22792286
try {
22802287
// Recalculate partIdMapObTable
2281-
// Clear the map before recalculating
2282-
partIdMapObTable.clear();
2283-
for (ObNewRange rang : tableQuery.getKeyRanges()) {
2284-
ObRowKey startKey = rang.getStartKey();
2285-
int startKeySize = startKey.getObjs().size();
2286-
ObRowKey endKey = rang.getEndKey();
2287-
int endKeySize = endKey.getObjs().size();
2288-
Object[] start = new Object[startKeySize];
2289-
Object[] end = new Object[endKeySize];
2290-
for (int i = 0; i < startKeySize; i++) {
2291-
ObObj curStart = startKey.getObj(i);
2292-
if (curStart.isMinObj()) {
2293-
start[i] = curStart;
2294-
} else {
2295-
start[i] = curStart.getValue();
2296-
}
2297-
}
2298-
2299-
for (int i = 0; i < endKeySize; i++) {
2300-
ObObj curEnd = endKey.getObj(i);
2301-
if (curEnd.isMaxObj()) {
2302-
end[i] = curEnd;
2303-
} else {
2304-
end[i] = curEnd.getValue();
2305-
}
2306-
}
2307-
ObBorderFlag borderFlag = rang.getBorderFlag();
2308-
List<ObTableParam> params = getTableParams(request.getTableName(),
2309-
tableQuery, start, borderFlag.isInclusiveStart(), end,
2310-
borderFlag.isInclusiveEnd());
2311-
for (ObTableParam param : params) {
2312-
partIdMapObTable.put(param.getPartId(), param);
2313-
}
2288+
if (needRefreshTabletLocation) {
2289+
needRefreshTabletLocation = false;
2290+
boolean isHKV = request.getEntityType() == ObTableEntityType.HKV;
2291+
tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV);
23142292
}
2293+
Map<Long, ObTableParam> partIdMapObTable = tableRoute.getPartIdParamMapForQuery(
2294+
request.getTableName(), tableQuery.getScanRangeColumns(),
2295+
tableQuery.getKeyRanges());
23152296

23162297
// Check if partIdMapObTable size is greater than 1
23172298
boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute();
@@ -2344,9 +2325,12 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23442325
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(),
23452326
tryTimes, ex);
23462327

2347-
if (ex instanceof ObTableNeedFetchMetaException) {
2348-
// Refresh table info
2349-
refreshMeta(request.getTableName());
2328+
if (((ObTableException) ex).isNeedRefreshTableEntry()) {
2329+
needRefreshTabletLocation = true;
2330+
if (ex instanceof ObTableNeedFetchMetaException) {
2331+
// Refresh table info
2332+
refreshMeta(request.getTableName());
2333+
}
23502334
}
23512335
} else {
23522336
calculateContinuousFailure(request.getTableName(), ex.getMessage());

src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java

Lines changed: 72 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,33 @@ public TableEntry refreshTabletLocationBatch(String tableName) throws Exception
580580
}
581581
}
582582

583+
public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery query, boolean isHKV) throws Exception {
584+
Map<Long, ObTableParam> partIdParamMap = getPartIdParamMapForQuery(tableName, query.getScanRangeColumns(), query.getKeyRanges());
585+
if (isHKV) {
586+
// for HBase process, if distributed function is enabled, no need to do routing refresh
587+
boolean isDistributedSupported = getServerCapacity().isSupportDistributedExecute();
588+
if (partIdParamMap.size() > 1 && !isDistributedSupported) {
589+
throw new ObTablePartitionConsistentException(
590+
"query and mutate must be a atomic operation");
591+
} else if (isDistributedSupported) {
592+
return;
593+
}
594+
} else {
595+
// for table process, distributed function is not supported yet, need to refresh routing
596+
// for now only support to query single tablet
597+
if (partIdParamMap.size() > 1) {
598+
throw new ObTablePartitionConsistentException(
599+
"query and mutate must be a atomic operation");
600+
} else if (partIdParamMap.isEmpty()) {
601+
throw new ObTableException("could not find part id of range");
602+
}
603+
}
604+
Map.Entry<Long, ObTableParam> entry = partIdParamMap.entrySet().iterator().next();
605+
TableEntry tableEntry = getTableEntry(tableName);
606+
long tabletId = entry.getValue().getTabletId();
607+
refreshPartitionLocation(tableName, tabletId, tableEntry);
608+
}
609+
583610
private Long[] getTabletsFromTableEntry(TableEntry tableEntry) {
584611
Long[] tablets = null;
585612
if (tableEntry.isPartitionTable()) {
@@ -693,6 +720,44 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry,
693720
return obPartitionLocationInfo;
694721
}
695722

723+
public Map<Long, ObTableParam> getPartIdParamMapForQuery(String tableName, List<String> scanRangeColumns,
724+
List<ObNewRange> keyRanges) throws Exception {
725+
Map<Long, ObTableParam> parIdParamMapObTable = new HashMap<Long, ObTableParam>();
726+
for (ObNewRange keyRange : keyRanges) {
727+
ObRowKey startKey = keyRange.getStartKey();
728+
int startKeySize = startKey.getObjs().size();
729+
ObRowKey endKey = keyRange.getEndKey();
730+
int endKeySize = endKey.getObjs().size();
731+
Object[] start = new Object[startKeySize];
732+
Object[] end = new Object[endKeySize];
733+
for (int i = 0; i < startKeySize; i++) {
734+
ObObj curStart = startKey.getObj(i);
735+
if (curStart.isMinObj()) {
736+
start[i] = curStart;
737+
} else {
738+
start[i] = curStart.getValue();
739+
}
740+
}
741+
742+
for (int i = 0; i < endKeySize; i++) {
743+
ObObj curEnd = endKey.getObj(i);
744+
if (curEnd.isMaxObj()) {
745+
end[i] = curEnd;
746+
} else {
747+
end[i] = curEnd.getValue();
748+
}
749+
}
750+
ObBorderFlag borderFlag = keyRange.getBorderFlag();
751+
List<ObTableParam> paramList = getTablesInternal(tableName, scanRangeColumns, start,
752+
borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
753+
tableClient.getRoute(false));
754+
for (ObTableParam param : paramList) {
755+
parIdParamMapObTable.put(param.getPartId(), param);
756+
}
757+
}
758+
return parIdParamMapObTable;
759+
}
760+
696761
/**
697762
* get addr by partId
698763
* @param tableName table want to get
@@ -869,50 +934,18 @@ public ObTableParam getTableParam(String tableName, List<String> scanRangeColumn
869934
*/
870935
public ObTableParam getTableParam(String tableName, List<String> scanRangeColumns,
871936
List<ObNewRange> keyRanges) throws Exception {
872-
Map<Long, ObTableParam> tabletIdIdMapObTable = new HashMap<Long, ObTableParam>();
873-
for (ObNewRange keyRange : keyRanges) {
874-
ObRowKey startKey = keyRange.getStartKey();
875-
int startKeySize = startKey.getObjs().size();
876-
ObRowKey endKey = keyRange.getEndKey();
877-
int endKeySize = endKey.getObjs().size();
878-
Object[] start = new Object[startKeySize];
879-
Object[] end = new Object[endKeySize];
880-
for (int i = 0; i < startKeySize; i++) {
881-
ObObj curStart = startKey.getObj(i);
882-
if (curStart.isMinObj()) {
883-
start[i] = curStart;
884-
} else {
885-
start[i] = curStart.getValue();
886-
}
887-
}
888-
889-
for (int i = 0; i < endKeySize; i++) {
890-
ObObj curEnd = endKey.getObj(i);
891-
if (curEnd.isMaxObj()) {
892-
end[i] = curEnd;
893-
} else {
894-
end[i] = curEnd.getValue();
895-
}
896-
}
897-
ObBorderFlag borderFlag = keyRange.getBorderFlag();
898-
List<ObTableParam> paramList = getTablesInternal(tableName, scanRangeColumns, start,
899-
borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
900-
tableClient.getRoute(false));
901-
for (ObTableParam param : paramList) {
902-
tabletIdIdMapObTable.put(param.getTabletId(), param);
903-
}
904-
}
937+
Map<Long, ObTableParam> partIdIdMapObTable = getPartIdParamMapForQuery(
938+
tableName, scanRangeColumns, keyRanges);
905939
// for now only support to query single tablet
906-
if (tabletIdIdMapObTable.size() > 1) {
940+
if (partIdIdMapObTable.size() > 1) {
907941
throw new ObTablePartitionConsistentException(
908942
"query and mutate must be a atomic operation");
909-
} else if (tabletIdIdMapObTable.size() < 1) {
943+
} else if (partIdIdMapObTable.isEmpty()) {
910944
throw new ObTableException("could not find part id of range");
911945
}
912946
ObTableParam ans = null;
913-
for (Long tabletId : tabletIdIdMapObTable.keySet()) {
914-
ans = tabletIdIdMapObTable.get(tabletId);
915-
}
947+
Map.Entry<Long, ObTableParam> entry = partIdIdMapObTable.entrySet().iterator().next();
948+
ans = entry.getValue();
916949
return ans;
917950
}
918951

@@ -979,45 +1012,7 @@ private List<ObTableParam> getTablesInternal(String tableName, List<String> scan
9791012
List<ObTableParam> params = new ArrayList<>();
9801013
for (ObPair<Long, ReplicaLocation> partIdWithReplica : partIdWithReplicaList) {
9811014
long partId = partIdWithReplica.getLeft();
982-
long tabletId = getTabletIdByPartId(tableEntry, partId);
983-
ReplicaLocation replica = partIdWithReplica.getRight();
984-
ObServerAddr addr = replica.getAddr();
985-
ObTable obTable = tableRoster.getTable(addr);
986-
int retryTimes = 0;
987-
while (obTable == null && retryTimes < 2) {
988-
++retryTimes;
989-
// need to refresh table roster to ensure the current roster is the latest
990-
tableClient.syncRefreshMetadata(true);
991-
// the addr is wrong, need to refresh location
992-
if (logger.isInfoEnabled()) {
993-
logger.info("Cannot get ObTable by addr {}, refreshing metadata.", addr);
994-
}
995-
// refresh tablet location based on the latest roster, in case that some of the observers hase been killed
996-
// and used the old location
997-
tableEntry = refreshPartitionLocation(tableName, tabletId, tableEntry);
998-
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
999-
replica = getPartitionLocation(locationInfo, route);
1000-
1001-
if (replica == null) {
1002-
RUNTIME.error("Cannot get replica by tableName: {}, tabletId: {}", tableName, tabletId);
1003-
throw new ObTableGetException("Cannot get replica by tableName: " + tableName + ", tabletId: " + tabletId);
1004-
}
1005-
addr = replica.getAddr();
1006-
obTable = tableRoster.getTable(addr);
1007-
}
1008-
if (obTable == null) {
1009-
RUNTIME.error("cannot get table by addr: " + addr);
1010-
throw new ObTableGetException("obTable is null, addr is: " + addr.getIp() + ":" + addr.getSvrPort());
1011-
}
1012-
1013-
ObTableParam param = new ObTableParam(obTable);
1014-
param.setLsId(tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getTabletLsId());
1015-
param.setTableId(tableEntry.getTableId());
1016-
param.setPartId(partId);
1017-
// real tablet id
1018-
param.setPartitionId(tabletId);
1019-
1020-
addr.recordAccess();
1015+
ObTableParam param = getTableInternal(tableName, tableEntry, partId, route);
10211016
params.add(param);
10221017
}
10231018
return params;

0 commit comments

Comments
 (0)