Skip to content

Commit 6c2b1c2

Browse files
authored
Capture exceptions in the refreshIndexInfo and do syncRefreshMetadata (#301)
* Capture exceptions in the refreshIndexInfo and do syncRefreshMetadata * fix * fix commonExecute not catch refreshException
1 parent dce57a2 commit 6c2b1c2

File tree

4 files changed

+85
-21
lines changed

4 files changed

+85
-21
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,19 +1141,41 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean fo
11411141
logger.info("index info is not exist, create new index info, indexTableName: {}",
11421142
indexTableName);
11431143
int serverSize = serverRoster.getMembers().size();
1144-
int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize
1145-
: tableEntryRefreshTryTimes;
1144+
int refreshTryTimes = Math.min(tableEntryRefreshTryTimes, serverSize);
11461145
for (int i = 0; i < refreshTryTimes; i++) {
1147-
ObServerAddr serverAddr = serverRoster.getServer(serverAddressPriorityTimeout,
1148-
serverAddressCachingTimeout);
1149-
indexInfo = getIndexInfoFromRemote(serverAddr, sysUA,
1150-
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1151-
indexTableName);
1152-
if (indexInfo != null) {
1153-
indexinfos.put(indexTableName, indexInfo);
1154-
} else {
1155-
RUNTIME.error("get index info from remote is null, indexTableName: {}",
1156-
indexTableName);
1146+
try {
1147+
ObServerAddr serverAddr = serverRoster.getServer(serverAddressPriorityTimeout,
1148+
serverAddressCachingTimeout);
1149+
if (serverAddr.isExpired(serverAddressCachingTimeout)) {
1150+
syncRefreshMetadata(false);
1151+
}
1152+
indexInfo = getIndexInfoFromRemote(serverAddr, sysUA,
1153+
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1154+
indexTableName);
1155+
if (indexInfo != null) {
1156+
indexinfos.put(indexTableName, indexInfo);
1157+
break;
1158+
} else {
1159+
RUNTIME.error("get index info from remote is null, indexTableName: {}",
1160+
indexTableName);
1161+
}
1162+
} catch (ObTableServerCacheExpiredException e) {
1163+
RUNTIME.error("get index info from remote meet exception", e);
1164+
syncRefreshMetadata(false);
1165+
} catch (ObTableEntryRefreshException e) {
1166+
RUNTIME.error("get index info from remote meet exception", e);
1167+
if (tableEntryRefreshContinuousFailureCount.incrementAndGet() > tableEntryRefreshContinuousFailureCeiling) {
1168+
logger.error(LCD.convert("01-00019"),
1169+
tableEntryRefreshContinuousFailureCeiling);
1170+
syncRefreshMetadata(false);
1171+
tableEntryRefreshContinuousFailureCount.set(0);
1172+
} else if (e.isConnectInactive()) {
1173+
syncRefreshMetadata(false);
1174+
tableEntryRefreshContinuousFailureCount.set(0);
1175+
}
1176+
} catch (Throwable t) {
1177+
RUNTIME.error("getOrRefreshTableEntry meet exception", t);
1178+
throw t;
11571179
}
11581180
}
11591181
return indexInfo;

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,10 @@ public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUs
10501050
} else {
10511051
throw new ObTableEntryRefreshException("index is not exist");
10521052
}
1053+
} catch (SQLException e) {
1054+
// cannot execute sql, maybe some of the observers have been killed
1055+
RUNTIME.error(LCD.convert("01-00010"), indexTableName, e.getMessage());
1056+
throw new ObTableEntryRefreshException("fail to get index info from remote", e, true);
10531057
} catch (Exception e) {
10541058
if (e instanceof ObTableEntryRefreshException) {
10551059
throw new ObTableEntryRefreshException(format(

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
import com.alipay.oceanbase.rpc.ObGlobal;
2121
import com.alipay.oceanbase.rpc.ObTableClient;
2222
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
23-
import com.alipay.oceanbase.rpc.exception.ObTableException;
24-
import com.alipay.oceanbase.rpc.exception.ObTableNeedFetchAllException;
25-
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
23+
import com.alipay.oceanbase.rpc.exception.*;
2624
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2725
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2826
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
@@ -329,9 +327,27 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
329327
}
330328

331329
// execute request
332-
ObTableQueryAsyncResult result = (ObTableQueryAsyncResult) commonExecute(this.client,
333-
logger, partIdWithObTable, streamRequest, connectionRef);
334-
330+
ObTableQueryAsyncResult result = null;
331+
for (int i = 0; i < client.getRuntimeRetryTimes(); i++) {
332+
try {
333+
result = (ObTableQueryAsyncResult) commonExecute(this.client,
334+
logger, partIdWithObTable, streamRequest, connectionRef);
335+
break;
336+
} catch (ObTableServerCacheExpiredException e) {
337+
client.syncRefreshMetadata(false);
338+
} catch (ObTableEntryRefreshException e) {
339+
if (e.isConnectInactive()) {
340+
client.syncRefreshMetadata(false);
341+
} else {
342+
throw e;
343+
}
344+
} catch (Throwable t) {
345+
throw t;
346+
}
347+
}
348+
if (result == null) {
349+
throw new ObTableRetryExhaustedException("exhaust retry times " + client.getRuntimeRetryTimes());
350+
}
335351
// cache result
336352
cacheResultRows(result);
337353

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import com.alipay.oceanbase.rpc.ObTableClient;
2121
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
22+
import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException;
23+
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
24+
import com.alipay.oceanbase.rpc.exception.ObTableServerCacheExpiredException;
2225
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2326
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2427
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
@@ -62,9 +65,28 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
6265
AtomicReference<ObTableConnection> connectionRef = new AtomicReference<>();
6366

6467
// execute request
65-
ObTableQueryResult result = (ObTableQueryResult) commonExecute(this.client, logger,
66-
partIdWithIndex, request, connectionRef);
67-
68+
ObTableQueryResult result = null;
69+
for (int i = 0; i < client.getRuntimeRetryTimes(); i++) {
70+
try {
71+
result = (ObTableQueryResult) commonExecute(this.client, logger,
72+
partIdWithIndex, request, connectionRef);
73+
break;
74+
} catch (ObTableServerCacheExpiredException e) {
75+
client.syncRefreshMetadata(false);
76+
} catch (ObTableEntryRefreshException e) {
77+
if (e.isConnectInactive()) {
78+
client.syncRefreshMetadata(false);
79+
} else {
80+
throw e;
81+
}
82+
} catch (Throwable t) {
83+
throw t;
84+
}
85+
}
86+
if (result == null) {
87+
throw new ObTableRetryExhaustedException("exhaust retry times " + client.getRuntimeRetryTimes());
88+
}
89+
6890
cacheStreamNext(partIdWithIndex, checkObTableQueryResult(result));
6991

7092
return result;

0 commit comments

Comments
 (0)