Skip to content

Commit d51e6c6

Browse files
committed
feature: Optimize partial refresh and add retry for executeMutation/query (#213)
* remove lock in refreshTableLocationByTabletId * fix refresh sql * fix refresh interval * add retry logic for common query * add retry logic for executeMutation * add retry logic for executeMutation * add retry logic for ObTableQueryAndMutateRequest * fix: correct ineffective retry logic * retry logic for ObTableClient execute * fix * fix
1 parent 11707bf commit d51e6c6

File tree

6 files changed

+262
-248
lines changed

6 files changed

+262
-248
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
678678
tryTimes);
679679
if (ex instanceof ObTableNeedFetchAllException) {
680680
needFetchAllRouteInfo = true;
681+
getOrRefreshTableEntry(tableName, true, true, true);
681682
// reset failure count while fetch all route info
682683
this.resetExecuteContinuousFailureCount(tableName);
683684
}
@@ -768,7 +769,6 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
768769
throw new IllegalArgumentException("table name is null");
769770
}
770771
boolean needRefreshTableEntry = false;
771-
boolean needFetchAllRouteInfo = false;
772772
int tryTimes = 0;
773773
long startExecute = System.currentTimeMillis();
774774
while (true) {
@@ -788,10 +788,14 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
788788
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
789789
} else {
790790
if (null != callback.getRowKey()) {
791+
// in the case of retry, the location always needs to be refreshed here
792+
if (tryTimes > 1) {
793+
TableEntry entry = getOrRefreshTableEntry(tableName, false, false, false);
794+
Long partId = getPartition(entry, callback.getRowKey());
795+
refreshTableLocationByTabletId(entry, tableName, getTabletIdByPartId(entry, partId));
796+
}
791797
// using row key
792-
obPair = getTable(tableName, callback.getRowKey(),
793-
needRefreshTableEntry, tableEntryRefreshIntervalWait,
794-
needFetchAllRouteInfo, route);
798+
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry, tableEntryRefreshIntervalWait, false, route);
795799
} else if (null != callback.getKeyRanges()) {
796800
// using scan range
797801
obPair = getTable(tableName, new ObTableQuery(),
@@ -853,7 +857,7 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
853857
((ObTableException) ex).getErrorCode(), ex.getMessage(),
854858
tryTimes);
855859
if (ex instanceof ObTableNeedFetchAllException) {
856-
needFetchAllRouteInfo = true;
860+
getOrRefreshTableEntry(tableName, needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true);
857861
// reset failure count while fetch all route info
858862
this.resetExecuteContinuousFailureCount(tableName);
859863
}
@@ -1337,33 +1341,14 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
13371341
}
13381342
long lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
13391343
long currentTime = System.currentTimeMillis();
1340-
if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
1344+
if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) {
13411345
return tableEntry;
13421346
}
1343-
1344-
Lock lock = tableEntry.refreshLockMap.computeIfAbsent(tabletId, k -> new ReentrantLock());
1345-
1346-
if (!lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS)) {
1347-
String errMsg = String.format("Try to lock table-entry refreshing timeout. DataSource: %s, TableName: %s, Timeout: %d.",
1348-
dataSourceName, tableName, tableEntryRefreshLockTimeout);
1349-
RUNTIME.error(errMsg);
1350-
throw new ObTableEntryRefreshException(errMsg);
1351-
}
1347+
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
1348+
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1349+
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
13521350

1353-
try {
1354-
lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
1355-
currentTime = System.currentTimeMillis();
1356-
if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
1357-
return tableEntry;
1358-
}
1359-
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
1360-
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1361-
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
1362-
1363-
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
1364-
} finally {
1365-
lock.unlock();
1366-
}
1351+
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
13671352

13681353
} catch (ObTableNotExistException | ObTableServerCacheExpiredException e) {
13691354
RUNTIME.error("RefreshTableEntry encountered an exception", e);
@@ -1676,7 +1661,9 @@ private ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey,
16761661
}
16771662

16781663
long partId = getPartition(tableEntry, row); // partition id in 3.x, origin partId in 4.x, logicId
1679-
1664+
if (refresh) {
1665+
refreshTableLocationByTabletId(tableEntry, tableName, getTabletIdByPartId(tableEntry, partId));
1666+
}
16801667
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
16811668
}
16821669

0 commit comments

Comments
 (0)