Skip to content

Commit 9245360

Browse files
committed
1. Aligned error codes with ODP retry handling. 2. Refined the semantics of getOrRefreshTableEntry functions other than getTableXXX should not call it directly.
1 parent 71a3e1a commit 9245360

File tree

6 files changed

+52
-23
lines changed

6 files changed

+52
-23
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -788,12 +788,6 @@ public void calculateContinuousFailure(String tableName, String errorMsg) throws
788788
}
789789
}
790790

791-
public void resetContinuousFailureByTableName(String tableName) {
792-
AtomicLong res = tableContinuousFailures.get(tableName);
793-
if (res != null) {
794-
res.set(0);
795-
}
796-
}
797791

798792
/**
799793
* Reset execute continuous failure count.
@@ -1530,9 +1524,9 @@ public ObPair<Long, ObTableParam> getTableByRowKeyRange(String tableName, ObTabl
15301524
* @throws Exception exception
15311525
*/
15321526
public ObPair<Long, ObTableParam> getTableWithPartId(String tableName, long partId,
1533-
boolean refresh, boolean waitForRefresh,
1527+
boolean refresh, boolean waitForRefresh, boolean needFetchAll,
15341528
ObServerRoute route) throws Exception {
1535-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
1529+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, needFetchAll);
15361530
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
15371531
}
15381532

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@
2929
import com.alipay.remoting.*;
3030
import com.alipay.remoting.exception.RemotingException;
3131
import io.netty.buffer.ByteBuf;
32+
import io.netty.util.internal.ResourcesUtil;
3233
import org.slf4j.Logger;
3334

35+
import javax.xml.transform.Result;
36+
3437
import static com.alipay.oceanbase.rpc.protocol.packet.ObCompressType.INVALID_COMPRESSOR;
3538
import static com.alipay.oceanbase.rpc.protocol.packet.ObCompressType.NONE_COMPRESSOR;
3639

@@ -126,10 +129,17 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
126129
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127130
"routed to the wrong server: " + response.getMessage());
128131
logger.warn(errMessage);
129-
if (needFetchAll(resultCode.getRcode())) {
132+
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130133
throw new ObTableNeedFetchAllException(errMessage);
131-
} else {
134+
} else if (needFetchPartical(resultCode.getRcode())) {
132135
throw new ObTableRoutingWrongException(errMessage);
136+
} else {
137+
// Encountered an unexpected RoutingWrong error code,
138+
// possibly due to the client error code version being behind the observer's version.
139+
// Attempting a full refresh here
140+
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
141+
logger.warn("get unexpected error code: " + response.getMessage());
142+
throw new ObTableNeedFetchAllException(errMessage);
133143
}
134144
}
135145
if (resultCode.getRcode() != 0) {
@@ -172,12 +182,34 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
172182
return new ObClientFuture(request.getId());
173183
}
174184

175-
private boolean needFetchAll(int errorCode) {
176-
return errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
177-
|| errorCode == ResultCodes.OB_INVALID_PARTITION.errorCode
178-
|| errorCode == ResultCodes.OB_UNKNOWN_PARTITION.errorCode
179-
|| errorCode == ResultCodes.OB_PARTITION_NOT_MATCH.errorCode
180-
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
181-
|| errorCode == ResultCodes.OB_REPLICA_NOT_READABLE.errorCode;
185+
// schema changed
186+
private boolean needFetchAll(int errorCode, int pcode) {
187+
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
188+
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
189+
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
190+
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
191+
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode)
192+
// Theoretically, this code should not be executed since there is no corresponding handling within the DOP.
193+
// However, the observer has flagged the following three error codes.
194+
// Adding this handling as a precautionary measure.
195+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode
196+
|| errorCode == ResultCodes.OB_USE_DUP_FOLLOW_AFTER_DML.errorCode
197+
|| errorCode == ResultCodes.OB_TRANS_STMT_NEED_RETRY.errorCode;
198+
}
199+
private boolean needFetchPartical(int errorCode) {
200+
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
201+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
202+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
203+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
204+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
205+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
206+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
207+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
208+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
209+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
210+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
211+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
212+
|| errorCode == ResultCodes.OB_TENANT_NOT_IN_SERVER.errorCode
213+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode;
182214
}
183215
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ public enum ResultCodes {
358358
OB_CLUSTER_NO_MATCH(-4666), //
359359
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
360360
OB_ERR_ZONE_NOT_EMPTY(-4668), //
361+
OB_USE_DUP_FOLLOW_AFTER_DML(-4686),
362+
OB_LS_NOT_EXIST(-4719), //
361363
OB_TABLET_NOT_EXIST(-4725), //
362364
OB_ERR_PARSER_INIT(-5000), //
363365
OB_ERR_PARSE_SQL(-5001), //
@@ -666,6 +668,7 @@ public enum ResultCodes {
666668
OB_PARTITION_IS_BLOCKED(-6229), //
667669
OB_TRANS_RPC_TIMEOUT(-6230), //
668670
OB_REPLICA_NOT_READABLE(-6231), //
671+
OB_TRANS_STMT_NEED_RETRY(-6241), //
669672
OB_LOG_ID_NOT_FOUND(-6301), //
670673
OB_LSR_THREAD_STOPPED(-6302), //
671674
OB_NO_LOG(-6303), //

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
138138
}
139139
subObTable = client
140140
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
141-
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
141+
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), false,
142142
route).getRight().getObTable();
143143
}
144144
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ public void partitionExecute(ObTableOperationResult[] results,
298298
ObTableBatchOperationResult subObTableBatchOperationResult;
299299

300300
boolean needRefreshTableEntry = false;
301+
boolean needFetchAllRouteInfo = false;
301302
int tryTimes = 0;
302303
long startExecute = System.currentTimeMillis();
303304
Set<String> failedServerList = null;
@@ -334,7 +335,7 @@ public void partitionExecute(ObTableOperationResult[] results,
334335
}
335336
ObTableParam newParam = obTableClient.getTableWithPartId(tableName,
336337
originPartId, needRefreshTableEntry,
337-
obTableClient.isTableEntryRefreshIntervalWait(), route).getRight();
338+
obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo, route).getRight();
338339

339340
subObTable = newParam.getObTable();
340341
subRequest.setPartitionId(newParam.getPartitionId());
@@ -383,10 +384,9 @@ public void partitionExecute(ObTableOperationResult[] results,
383384
tableName, partId, ((ObTableException) ex).getErrorCode(),
384385
tryTimes, ex);
385386
if (ex instanceof ObTableNeedFetchAllException) {
386-
obTableClient.getOrRefreshTableEntry(tableName, true,
387-
obTableClient.isTableEntryRefreshIntervalWait(), true);
387+
needFetchAllRouteInfo = true;
388388
// reset failure count while fetch all route info
389-
obTableClient.resetContinuousFailureByTableName(tableName);
389+
obTableClient.resetExecuteContinuousFailureCount(tableName);
390390
}
391391
} else {
392392
obTableClient.calculateContinuousFailure(tableName, ex.getMessage());

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
460460
route.setBlackList(failedServerList);
461461
}
462462
subObTable = obTableClient.getTableWithPartId(tableName, originPartId, needRefreshTableEntry,
463-
obTableClient.isTableEntryRefreshIntervalWait(), route).
463+
obTableClient.isTableEntryRefreshIntervalWait(), false, route).
464464
getRight().getObTable();
465465
}
466466
}

0 commit comments

Comments
 (0)