Skip to content

Commit a53ba60

Browse files
authored
feature: Optimize partial refresh and add retry for executeMutation/query (#213)
* remove lock in refreshTableLocationByTabletId * fix refresh sql * fix refresh interval * add retry logic for common query * add retry logic for executeMutation * add retry logic for executeMutation * add retry logic for ObTableQueryAndMutateRequest * fix: correct ineffective retry logic * retry logic for ObTableClient execute * fix * fix
1 parent bec5329 commit a53ba60

File tree

7 files changed

+350
-281
lines changed

7 files changed

+350
-281
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 102 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
677677
tryTimes);
678678
if (ex instanceof ObTableNeedFetchAllException) {
679679
needFetchAllRouteInfo = true;
680+
getOrRefreshTableEntry(tableName, true, true, true);
680681
// reset failure count while fetch all route info
681682
this.resetExecuteContinuousFailureCount(tableName);
682683
}
@@ -767,7 +768,6 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
767768
throw new IllegalArgumentException("table name is null");
768769
}
769770
boolean needRefreshTableEntry = false;
770-
boolean needFetchAllRouteInfo = false;
771771
int tryTimes = 0;
772772
long startExecute = System.currentTimeMillis();
773773
while (true) {
@@ -787,10 +787,14 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
787787
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
788788
} else {
789789
if (null != callback.getRowKey()) {
790+
// in the case of retry, the location always needs to be refreshed here
791+
if (tryTimes > 1) {
792+
TableEntry entry = getOrRefreshTableEntry(tableName, false, false, false);
793+
Long partId = getPartition(entry, callback.getRowKey());
794+
refreshTableLocationByTabletId(entry, tableName, getTabletIdByPartId(entry, partId));
795+
}
790796
// using row key
791-
obPair = getTable(tableName, callback.getRowKey(),
792-
needRefreshTableEntry, tableEntryRefreshIntervalWait,
793-
needFetchAllRouteInfo, route);
797+
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry, tableEntryRefreshIntervalWait, false, route);
794798
} else if (null != callback.getKeyRanges()) {
795799
// using scan range
796800
obPair = getTable(tableName, new ObTableQuery(),
@@ -852,7 +856,7 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
852856
((ObTableException) ex).getErrorCode(), ex.getMessage(),
853857
tryTimes);
854858
if (ex instanceof ObTableNeedFetchAllException) {
855-
needFetchAllRouteInfo = true;
859+
getOrRefreshTableEntry(tableName, needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true);
856860
// reset failure count while fetch all route info
857861
this.resetExecuteContinuousFailureCount(tableName);
858862
}
@@ -1336,33 +1340,14 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
13361340
}
13371341
long lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
13381342
long currentTime = System.currentTimeMillis();
1339-
if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
1343+
if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) {
13401344
return tableEntry;
13411345
}
1342-
1343-
Lock lock = tableEntry.refreshLockMap.computeIfAbsent(tabletId, k -> new ReentrantLock());
1344-
1345-
if (!lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS)) {
1346-
String errMsg = String.format("Try to lock table-entry refreshing timeout. DataSource: %s, TableName: %s, Timeout: %d.",
1347-
dataSourceName, tableName, tableEntryRefreshLockTimeout);
1348-
RUNTIME.error(errMsg);
1349-
throw new ObTableEntryRefreshException(errMsg);
1350-
}
1351-
1352-
try {
1353-
lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
1354-
currentTime = System.currentTimeMillis();
1355-
if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
1356-
return tableEntry;
1357-
}
1358-
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
1359-
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1360-
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
1346+
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
1347+
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1348+
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
13611349

1362-
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
1363-
} finally {
1364-
lock.unlock();
1365-
}
1350+
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
13661351

13671352
} catch (ObTableNotExistException | ObTableServerCacheExpiredException e) {
13681353
RUNTIME.error("RefreshTableEntry encountered an exception", e);
@@ -1676,7 +1661,9 @@ private ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey,
16761661
}
16771662

16781663
long partId = getPartition(tableEntry, row); // partition id in 3.x, origin partId in 4.x, logicId
1679-
1664+
if (refresh) {
1665+
refreshTableLocationByTabletId(tableEntry, tableName, getTabletIdByPartId(tableEntry, partId));
1666+
}
16801667
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
16811668
}
16821669

@@ -3149,41 +3136,95 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
31493136
request.setTimeout(getOdpTable().getObTableOperationTimeout());
31503137
return getOdpTable().execute(request);
31513138
} else {
3139+
int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries
3140+
int tryTimes = 0;
3141+
long startExecute = System.currentTimeMillis();
3142+
boolean needRefreshTableEntry = false;
31523143
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
3153-
for (ObNewRange rang : tableQuery.getKeyRanges()) {
3154-
ObRowKey startKey = rang.getStartKey();
3155-
int startKeySize = startKey.getObjs().size();
3156-
ObRowKey endKey = rang.getEndKey();
3157-
int endKeySize = endKey.getObjs().size();
3158-
Object[] start = new Object[startKeySize];
3159-
Object[] end = new Object[endKeySize];
3160-
for (int i = 0; i < startKeySize; i++) {
3161-
start[i] = startKey.getObj(i).getValue();
3144+
while (true) {
3145+
long currentExecute = System.currentTimeMillis();
3146+
long costMillis = currentExecute - startExecute;
3147+
if (costMillis > getRuntimeMaxWait()) {
3148+
logger.error(
3149+
"tablename:{} it has tried " + tryTimes
3150+
+ " times and it has waited " + costMillis
3151+
+ "/ms which exceeds response timeout "
3152+
+ getRuntimeMaxWait() + "/ms", request.getTableName());
3153+
throw new ObTableTimeoutExcetion("it has tried " + tryTimes
3154+
+ " times and it has waited " + costMillis
3155+
+ "/ms which exceeds response timeout "
3156+
+ getRuntimeMaxWait() + "/ms");
31623157
}
3158+
try {
3159+
// Recalculate partIdMapObTable
3160+
// Clear the map before recalculating
3161+
partIdMapObTable.clear();
3162+
for (ObNewRange rang : tableQuery.getKeyRanges()) {
3163+
ObRowKey startKey = rang.getStartKey();
3164+
int startKeySize = startKey.getObjs().size();
3165+
ObRowKey endKey = rang.getEndKey();
3166+
int endKeySize = endKey.getObjs().size();
3167+
Object[] start = new Object[startKeySize];
3168+
Object[] end = new Object[endKeySize];
3169+
for (int i = 0; i < startKeySize; i++) {
3170+
start[i] = startKey.getObj(i).getValue();
3171+
}
31633172

3164-
for (int i = 0; i < endKeySize; i++) {
3165-
end[i] = endKey.getObj(i).getValue();
3166-
}
3167-
ObBorderFlag borderFlag = rang.getBorderFlag();
3168-
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
3169-
tableQuery, start, borderFlag.isInclusiveStart(), end,
3170-
borderFlag.isInclusiveEnd(), false, false);
3171-
for (ObPair<Long, ObTableParam> pair : pairList) {
3172-
partIdMapObTable.put(pair.getLeft(), pair.getRight());
3173-
}
3174-
}
3175-
if (partIdMapObTable.size() > 1) {
3176-
throw new ObTablePartitionConsistentException(
3177-
"query and mutate must be a atomic operation");
3178-
}
3173+
for (int i = 0; i < endKeySize; i++) {
3174+
end[i] = endKey.getObj(i).getValue();
3175+
}
3176+
ObBorderFlag borderFlag = rang.getBorderFlag();
3177+
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
3178+
tableQuery, start, borderFlag.isInclusiveStart(), end,
3179+
borderFlag.isInclusiveEnd(), needRefreshTableEntry, isTableEntryRefreshIntervalWait());
3180+
for (ObPair<Long, ObTableParam> pair : pairList) {
3181+
partIdMapObTable.put(pair.getLeft(), pair.getRight());
3182+
}
3183+
}
31793184

3180-
for (Long partId : partIdMapObTable.keySet()) {
3181-
ObTableParam tableParam = partIdMapObTable.get(partId);
3182-
request.setTableId(tableParam.getTableId());
3183-
request.setPartitionId(tableParam.getPartitionId());
3184-
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
3185-
ObTable obTable = tableParam.getObTable();
3186-
return executeWithRetry(obTable, request, request.getTableName());
3185+
// Check if partIdMapObTable size is greater than 1
3186+
if (partIdMapObTable.size() > 1) {
3187+
throw new ObTablePartitionConsistentException(
3188+
"query and mutate must be a atomic operation");
3189+
}
3190+
// Proceed with the operation
3191+
Map.Entry<Long, ObTableParam> entry = partIdMapObTable.entrySet().iterator().next();
3192+
ObTableParam tableParam = entry.getValue();
3193+
request.setTableId(tableParam.getTableId());
3194+
request.setPartitionId(tableParam.getPartitionId());
3195+
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
3196+
ObTable obTable = tableParam.getObTable();
3197+
3198+
// Attempt to execute the operation
3199+
return executeWithRetry(obTable, request, request.getTableName());
3200+
} catch (Exception ex) {
3201+
tryTimes++;
3202+
if (ex instanceof ObTableException && ((ObTableException) ex).isNeedRefreshTableEntry()) {
3203+
needRefreshTableEntry = true;
3204+
logger.warn(
3205+
"tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}",
3206+
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(), ex);
3207+
3208+
if (isRetryOnChangeMasterTimes() && tryTimes <= maxRetries) {
3209+
logger.warn(
3210+
"tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}",
3211+
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(),
3212+
tryTimes, ex);
3213+
3214+
if (ex instanceof ObTableNeedFetchAllException) {
3215+
// Refresh table info
3216+
getOrRefreshTableEntry(request.getTableName(), needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true);
3217+
}
3218+
} else {
3219+
calculateContinuousFailure(request.getTableName(), ex.getMessage());
3220+
throw ex;
3221+
}
3222+
} else {
3223+
calculateContinuousFailure(request.getTableName(), ex.getMessage());
3224+
// Handle other exceptions or rethrow
3225+
throw ex;
3226+
}
3227+
}
31873228
}
31883229
}
31893230
}

0 commit comments

Comments
 (0)