Skip to content

Commit 12ef195

Browse files
authored
Refactor table routing process (#310)
* refactor table routing process for better capacity of disaster-recovery, routing-atomicity and better retry structure * fix schema version not updated in retry * add log for table not exist in truncate * rebase secondary_part master * add log, add time check when refresh rsList * refresh tableEntry object when refreshing tablet location meeting schema_version mismatched * enable get connection num in odp mode * fix single observer do not retry * enable server version checking in odp mode * remove ls batch log * add ls_id refreshing log; refreshMeta retry uses tableEntryRefreshTryTimes * set refresh interval to 100ms from 300ms to ensure refreshing correct ls_id when ls batch retry * refresh tablet location to refresh leader observer when transport timeout * rebase table master * use runtimeMaxWait to keep refreshing table meta and tablet location to ensure correctness * use master refreshing interval; use runtimeMaxWait in batch execution * refresh batch tablet locations when drop table using lsbatch, use runtimeBatchMaxWait to retry lsbatch * add refresh metadata log * add valid flag to obTable to avoid unnecessary reconnect if observer down * set dirty if fail to reconnect * unify to check schema version and throw exception * use runtimeMaxWait to replace tryTimes in async stream result * set ThreadLocalMap HashMap to ConcurrentMap; check valid in executeWithConnection * set dirty in normal batch if observer down; use tableEntryRefreshLockTimeout to block refreshing rs * optimize by review * reset cached result in sync query init if need to refresh table meta; refresh tablet location in sync query and normal batch if needed
1 parent 59d4a69 commit 12ef195

File tree

57 files changed

+4760
-3063
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+4760
-3063
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ public static boolean isHBaseBatchSupport() {
105105
|| OB_VERSION >= OB_VERSION_4_3_5_0;
106106
}
107107

108+
public static boolean isSchemaVersionSupport() {
109+
return OB_VERSION >= OB_VERSION_4_3_5_2;
110+
}
111+
108112
public static boolean isDistributedExecSupport() {
109113
return OB_VERSION >= OB_VERSION_4_3_5_2;
110114
}

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 468 additions & 2013 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ private void login() throws Exception {
195195
// no need to retry when the username or password is wrong.
196196
if (e instanceof ObTableAuthException) {
197197
throw new ObTableLoginException(errMessage, e);
198+
} else if (e instanceof FeatureNotSupportedException) {
199+
throw e;
198200
}
199201
}
200202
}
@@ -230,6 +232,7 @@ public void checkStatus() throws Exception {
230232
reconnect("Check connection is null");
231233
}
232234
if (connection.getChannel() == null || !connection.getChannel().isActive()) {
235+
LOGGER.warn("[latency monitor] need to reconnect server: {}:{}", obTable.getIp(), obTable.getPort());
233236
reconnect("Check connection failed for address: " + connection.getUrl());
234237
}
235238
if (!connection.getChannel().isWritable()) {

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
8484
return null;
8585
} else if (!response.isSuccess()) {
8686
String errMessage = TraceUtil.formatTraceMessage(conn, request,
87-
"get an error response: " + response.getMessage());
87+
"get an error response: " + response.getMessage() + ", transportCode: " + response.getTransportCode());
8888
logger.warn(errMessage);
8989
response.releaseByteBuf();
9090
ExceptionUtil.throwObTableTransportException(errMessage, response.getTransportCode());
@@ -125,30 +125,30 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
125125
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127127
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
128-
+ response.getMessage());
128+
+ resultCode.getErrMsg());
129129
logger.debug(errMessage);
130-
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
131-
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
132-
} else if (needFetchPartial(resultCode.getRcode())) {
130+
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
131+
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
132+
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
133133
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
134134
} else {
135135
// Encountered an unexpected RoutingWrong error code,
136136
// possibly due to the client error code version being behind the observer's version.
137137
// Attempting a full refresh here
138138
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
139139
logger.warn("get unexpected error code: {}", errMessage);
140-
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
140+
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
141141
}
142142
}
143143
if (resultCode.getRcode() != 0
144144
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
145145
String errMessage = TraceUtil.formatTraceMessage(conn, request,
146146
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
147-
+ response.getMessage());
147+
+ resultCode.getErrMsg());
148148
logger.debug(errMessage);
149-
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
150-
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
151-
} else if (needFetchPartial(resultCode.getRcode())) {
149+
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
150+
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
151+
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
152152
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
153153
} else {
154154
logger.warn(errMessage);
@@ -192,7 +192,7 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
192192
}
193193

194194
// schema changed
195-
private boolean needFetchAll(int errorCode, int pcode) {
195+
private boolean needFetchMeta(int errorCode, int pcode) {
196196
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
197197
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
198198
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
@@ -202,10 +202,11 @@ private boolean needFetchAll(int errorCode, int pcode) {
202202
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode
203203
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
204204
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
205+
|| errorCode == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode
205206
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
206207
}
207208

208-
private boolean needFetchPartial(int errorCode) {
209+
private boolean needFetchPartitionLocation(int errorCode) {
209210
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
210211
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
211212
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode

src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public MutationResult execute() throws Exception {
109109
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
110110
insUp.getRowKey().getValues(), insUp.getColumns(), insUp.getValues());
111111

112-
return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation,
112+
return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, operation,
113113
false, true, checkExists, rollbackWhenCheckFailed));
114114
}
115115
}

src/main/java/com/alipay/oceanbase/rpc/exception/ObTableEntryRefreshException.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package com.alipay.oceanbase.rpc.exception;
1919

2020
public class ObTableEntryRefreshException extends ObTableException {
21-
21+
2222
private boolean connectInactive = false;
2323

2424
/*
@@ -54,7 +54,7 @@ public ObTableEntryRefreshException(String message) {
5454
public ObTableEntryRefreshException(String message, Throwable cause) {
5555
super(message, cause);
5656
}
57-
57+
5858
public ObTableEntryRefreshException(String message, Throwable cause, boolean connectInactive) {
5959
super(message, cause);
6060
this.connectInactive = connectInactive;
@@ -63,6 +63,7 @@ public ObTableEntryRefreshException(String message, Throwable cause, boolean con
6363
public boolean isConnectInactive() {
6464
return connectInactive;
6565
}
66+
6667
/*
6768
* Ob table entry refresh exception.
6869
*/

src/main/java/com/alipay/oceanbase/rpc/exception/ObTableException.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package com.alipay.oceanbase.rpc.exception;
1919

20+
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
21+
2022
public class ObTableException extends RuntimeException {
2123

22-
private int errorCode;
23-
private boolean needRefreshTableEntry;
24+
private int errorCode = -1;
25+
private boolean needRefreshTableEntry = false;
2426

2527
/*
2628
* Ob table exception.
@@ -82,4 +84,10 @@ public boolean isNeedRefreshTableEntry() {
8284
return needRefreshTableEntry;
8385
}
8486

87+
public boolean isNeedRetryError() {
88+
return errorCode == ResultCodes.OB_TRY_LOCK_ROW_CONFLICT.errorCode
89+
|| errorCode == ResultCodes.OB_TRANSACTION_SET_VIOLATION.errorCode
90+
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode;
91+
}
92+
8593
}

src/main/java/com/alipay/oceanbase/rpc/exception/ObTableNeedFetchAllException.java renamed to src/main/java/com/alipay/oceanbase/rpc/exception/ObTableNeedFetchMetaException.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,45 +17,47 @@
1717

1818
package com.alipay.oceanbase.rpc.exception;
1919

20-
public class ObTableNeedFetchAllException extends ObTableException {
20+
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
21+
22+
public class ObTableNeedFetchMetaException extends ObTableException {
2123
/*
2224
* Ob table routing wrong exception.
2325
*/
24-
public ObTableNeedFetchAllException() {
26+
public ObTableNeedFetchMetaException() {
2527
}
2628

2729
/*
2830
* Ob table routing wrong exception with error code.
2931
*/
30-
public ObTableNeedFetchAllException(int errorCode) {
32+
public ObTableNeedFetchMetaException(int errorCode) {
3133
super(errorCode);
3234
}
3335

3436
/*
3537
* Ob table routing wrong exception with message and error code.
3638
*/
37-
public ObTableNeedFetchAllException(String message, int errorCode) {
39+
public ObTableNeedFetchMetaException(String message, int errorCode) {
3840
super(message, errorCode);
3941
}
4042

4143
/*
4244
* Ob table routing wrong exception with message.
4345
*/
44-
public ObTableNeedFetchAllException(String message) {
46+
public ObTableNeedFetchMetaException(String message) {
4547
super(message);
4648
}
4749

4850
/*
4951
* Ob table routing wrong exception with message and cause.
5052
*/
51-
public ObTableNeedFetchAllException(String message, Throwable cause) {
53+
public ObTableNeedFetchMetaException(String message, Throwable cause) {
5254
super(message, cause);
5355
}
5456

5557
/*
5658
* Ob table routing wrong exception with cause.
5759
*/
58-
public ObTableNeedFetchAllException(Throwable cause) {
60+
public ObTableNeedFetchMetaException(Throwable cause) {
5961
super(cause);
6062
}
6163

@@ -65,4 +67,13 @@ public ObTableNeedFetchAllException(Throwable cause) {
6567
public boolean isNeedRefreshTableEntry() {
6668
return true;
6769
}
70+
71+
public boolean isNeedRefreshMetaAndLocation() {
72+
return getErrorCode() == ResultCodes.OB_LS_NOT_EXIST.errorCode // need to refresh the whole tablets in this ls
73+
|| getErrorCode() == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode // fetch a wrong ls tablets, need to refetch locations
74+
|| getErrorCode() == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
75+
|| getErrorCode() == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode // table has been drop and recreated
76+
|| getErrorCode() == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
77+
|| getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode; // drop table
78+
}
6879
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.exception;
19+
20+
public class ObTableSchemaVersionMismatchException extends ObTableException {
21+
/*
22+
* Ob table schema version mismatch exception.
23+
*/
24+
public ObTableSchemaVersionMismatchException() {
25+
}
26+
27+
/*
28+
* Ob table schema version mismatch exception with error code.
29+
*/
30+
public ObTableSchemaVersionMismatchException(int errorCode) {
31+
super(errorCode);
32+
}
33+
34+
/*
35+
* Ob table schema version mismatch exception with message and error code.
36+
*/
37+
public ObTableSchemaVersionMismatchException(String message, int errorCode) {
38+
super(message, errorCode);
39+
}
40+
41+
/*
42+
* Ob table schema version mismatch exception with message.
43+
*/
44+
public ObTableSchemaVersionMismatchException(String message) {
45+
super(message);
46+
}
47+
48+
/*
49+
* Ob table schema version mismatch exception with message and cause.
50+
*/
51+
public ObTableSchemaVersionMismatchException(String message, Throwable cause) {
52+
super(message, cause);
53+
}
54+
55+
/*
56+
* Ob table schema version mismatch exception with case.
57+
*/
58+
public ObTableSchemaVersionMismatchException(Throwable cause) {
59+
super(cause);
60+
}
61+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.exception;
19+
20+
public class ObTableTryLockTimeoutException extends ObTableException {
21+
/*
22+
* Ob table try lock timeout exception.
23+
*/
24+
public ObTableTryLockTimeoutException() {
25+
}
26+
27+
/*
28+
* Ob table try lock timeout exception.
29+
*/
30+
public ObTableTryLockTimeoutException(int errorCode) {
31+
super(errorCode);
32+
}
33+
34+
/*
35+
* Ob table try lock timeout exception.
36+
*/
37+
public ObTableTryLockTimeoutException(String message, int errorCode) {
38+
super(message, errorCode);
39+
}
40+
41+
/*
42+
* Ob table try lock timeout exception.
43+
*/
44+
public ObTableTryLockTimeoutException(String message) {
45+
super(message);
46+
}
47+
48+
/*
49+
* Ob table try lock timeout exception.
50+
*/
51+
public ObTableTryLockTimeoutException(String message, Throwable cause) {
52+
super(message, cause);
53+
}
54+
55+
/*
56+
* Ob table try lock timeout exception.
57+
*/
58+
public ObTableTryLockTimeoutException(Throwable cause) {
59+
super(cause);
60+
}
61+
}

0 commit comments

Comments
 (0)