Skip to content

Commit d8b4634

Browse files
Hbase 441 feature (#381)
* Hbase put perf (#368) * new hbase protocal (#363) * new hbase request and result for multi-cf * odp support new hbase protocol * order table_name when get by tablegroup --------- Co-authored-by: vanson <[email protected]> * Add rerouting error feedback refreshing logic (#358) * refresh table meta and locatino based on flag, refresh cache after retry successfully in server * remove lsop + OB_NOT_MASTER * change debug log to warn * make execute and operation time longer * add debug log * revert log level and operation timeout * Fix old client and odp disable to refresh tableEntry if using new server (#371) * add server_can_retry flag to enable retry capacity for new client operations * add ObBinlogRowImageType for QueryAndMutateRequest to skip the space of encode * heap table tests (#374) * Fix scan sess missing transfer and single cf table put (#375) * generate new query request when hash_not_exist during query_next * fix isKeyInRange when encounter MinObObj or MaxObObj * fix hbase put sinlge cf table * do not refresh tableEntry if require_rerouting is true in odp mode * add valid status check when add new obtable (#377) * remove invalid table (#378) * set dirty and add ip to suspect list (#379) --------- Co-authored-by: vanson <[email protected]>
1 parent 7b052ee commit d8b4634

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+10095
-173
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ public static boolean isCellTTLSupport() {
121121
return OB_VERSION >= OB_VERSION_4_3_5_1;
122122
}
123123

124+
public static boolean isHBasePutPerfSupport() {
125+
return OB_VERSION >= OB_VERSION_4_4_1_0;
126+
}
127+
128+
/*-------------------------------------------- OB_VERSION --------------------------------------------*/
129+
124130
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
125131

126132
public static final long OB_VERSION_4_2_5_2 = calcVersion(4, (short) 2, (byte) 5, (byte) 2);
@@ -139,5 +145,15 @@ public static boolean isCellTTLSupport() {
139145

140146
public static final long OB_VERSION_4_4_0_0 = calcVersion(4, (short) 4, (byte) 0, (byte) 0);
141147

148+
public static final long OB_VERSION_4_4_1_0 = calcVersion(4, (short) 4, (byte) 1, (byte) 0);
149+
142150
public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
151+
152+
/*-------------------------------------------- OB_PROXY_VERSION --------------------------------------------*/
153+
154+
public static final long OB_PROXY_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0);
155+
156+
public static final long OB_PROXY_VERSION_4_3_6_0 = calcVersion(4, (short) 3, (byte) 6, (byte) 0);
157+
158+
public static long OB_PROXY_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
143159
}

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ public void calculateContinuousFailure(String tableName, String errorMsg) throws
837837
if (failures.incrementAndGet() > runtimeContinuousFailureCeiling) {
838838
logger.warn("refresh table entry {} while execute failed times exceeded {}, msg: {}",
839839
tableName, runtimeContinuousFailureCeiling, errorMsg);
840-
refreshMeta(tableName);
840+
tableRoute.refreshMeta(tableName);
841841
failures.set(0);
842842
} else {
843843
logger.warn("error msg: {}, current continues failure count: {}", errorMsg, failures);
@@ -968,14 +968,6 @@ public TableEntry getOrRefreshTableEntry(final String tableName, boolean forceRe
968968
if (!forceRefresh) {
969969
return tableRoute.getTableEntry(tableName);
970970
}
971-
return refreshMeta(tableName);
972-
}
973-
974-
/**
975-
* refresh table meta information except location
976-
* @param tableName table name
977-
* */
978-
private TableEntry refreshMeta(String tableName) throws Exception {
979971
return tableRoute.refreshMeta(tableName);
980972
}
981973

@@ -1201,7 +1193,10 @@ public void dealWithRpcTimeoutForBatchTablet(ObServerAddr addr, String tableName
12011193
*/
12021194
public String tryGetTableNameFromTableGroupCache(final String tableGroupName,
12031195
final boolean refresh) throws Exception {
1204-
return tableRoute.tryGetTableNameFromTableGroupCache(tableGroupName, refresh);
1196+
if (isTableGroupName(tableGroupName)) {
1197+
return tableRoute.tryGetTableNameFromTableGroupCache(tableGroupName, refresh);
1198+
}
1199+
return tableGroupName;
12051200
}
12061201

12071202
/**
@@ -2102,6 +2097,18 @@ public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tab
21022097
"Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString());
21032098
throw new ObTableRoutingWrongException();
21042099
}
2100+
} else if (result != null && result.isRoutingWrong() && !isOdpMode()) {
2101+
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
2102+
obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta());
2103+
if (result.isNeedRefreshMeta()) {
2104+
tableRoute.refreshMeta(tableName);
2105+
}
2106+
if (request instanceof ObTableAbstractOperationRequest) {
2107+
long tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
2108+
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
2109+
} else if (request instanceof ObHbaseRequest) {
2110+
tableRoute.refreshTabletLocationBatch(tableName);
2111+
}
21052112
}
21062113
return result;
21072114
}
@@ -2365,7 +2372,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23652372
needRefreshTabletLocation = true;
23662373
if (ex instanceof ObTableNeedFetchMetaException) {
23672374
// Refresh table info
2368-
refreshMeta(request.getTableName());
2375+
tableRoute.refreshMeta(request.getTableName());
23692376
}
23702377
}
23712378
} else {
@@ -2386,6 +2393,36 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23862393
+ "is not supported. make sure the correct version");
23872394
}
23882395

2396+
public ObPayload execute(final ObHbaseRequest request) throws Exception {
2397+
if (request.getTableName() == null || request.getTableName().isEmpty()) {
2398+
throw new IllegalArgumentException("table name is null");
2399+
}
2400+
if (isOdpMode()) {
2401+
return getOdpTable().execute(request);
2402+
} else {
2403+
Row row = new Row();
2404+
// get the first cell from the first cfRows to route
2405+
// use the first table in tablegroup to route
2406+
String realTableName = null;
2407+
if (request.getCfRows().isEmpty()) {
2408+
throw new ObTableUnexpectedException("no cf rows");
2409+
}
2410+
if (request.getCfRows().size() > 1) {
2411+
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2412+
} else {
2413+
realTableName = request.getCfRows().get(0).getRealTableName();
2414+
}
2415+
int keyIdx = request.getCfRows().get(0).getKeyIndex(0);
2416+
row.add("K", request.getKeys().get(keyIdx).getValue());
2417+
row.add("Q", request.getCfRows().get(0).getCells().get(0).getQ().getValue());
2418+
row.add("T", request.getCfRows().get(0).getCells().get(0).getT().getValue());
2419+
ObTableParam tableParam = tableRoute.getTableParam(realTableName, row);
2420+
ObTable obTable = tableParam.getObTable();
2421+
request.setTimeout(obTable.getObTableOperationTimeout());
2422+
return executeWithRetry(obTable, request, realTableName);
2423+
}
2424+
}
2425+
23892426
private ObTableQueryAndMutate buildObTableQueryAndMutate(ObTableQuery obTableQuery,
23902427
ObTableBatchOperation obTableBatchOperation) {
23912428
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
2727
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
2828
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObFetchPartitionMetaResult;
29+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObHbaseResult;
2930
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
3031
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
3132
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
@@ -35,6 +36,7 @@
3536
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
3637
import com.alipay.remoting.CommandCode;
3738

39+
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_HBASE_EXECUTE;
3840
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_META_INFO_EXECUTE;
3941

4042
public enum ObTablePacketCode implements CommandCode {
@@ -143,6 +145,12 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
143145
public ObPayload newPayload(ObRpcPacketHeader header) {
144146
return new ObTableMetaResponse();
145147
}
148+
},
149+
OB_TABLE_API_HBASE_EXECUTE(Pcodes.OB_TABLE_API_HBASE_EXECUTE) {
150+
@Override
151+
public ObPayload newPayload(ObRpcPacketHeader header) {
152+
return new ObHbaseResult();
153+
}
146154
};
147155

148156
private short value;
@@ -185,8 +193,10 @@ public static ObTablePacketCode valueOf(short value) {
185193
return OB_TABLE_API_MOVE;
186194
case Pcodes.OB_ERROR_PACKET:
187195
return OB_ERROR_PACKET;
188-
case OB_TABLE_API_META_INFO_EXECUTE:
196+
case Pcodes.OB_TABLE_API_META_INFO_EXECUTE:
189197
return OB_TABLE_META_INFO_EXECUTE;
198+
case Pcodes.OB_TABLE_API_HBASE_EXECUTE:
199+
return OB_TABLE_API_HBASE_EXECUTE;
190200
}
191201
throw new IllegalArgumentException("Unknown Rpc command code value ," + value);
192202
}

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -119,42 +119,39 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
119119
}
120120

121121
// decode ResultCode for response packet
122+
boolean isRoutingWrong = false;
123+
boolean isNeedRefreshMeta = false;
122124
ObRpcResultCode resultCode = new ObRpcResultCode();
123125
resultCode.decode(buf);
124-
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125-
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126-
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127-
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
128-
+ resultCode.getErrMsg());
129-
logger.debug(errMessage);
130-
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
131-
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
132-
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
133-
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
134-
} else {
135-
// Encountered an unexpected RoutingWrong error code,
136-
// possibly due to the client error code version being behind the observer's version.
137-
// Attempting a full refresh here
138-
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
139-
logger.warn("get unexpected error code: {}", errMessage);
140-
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
141-
}
142-
}
143-
if (resultCode.getRcode() != 0
144-
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
145-
String errMessage = TraceUtil.formatTraceMessage(conn, request,
146-
"meet exception: [error code:" + resultCode.getRcode() + "]"
147-
+ resultCode.getErrMsg());
148-
logger.debug(errMessage);
149-
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
150-
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
151-
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
152-
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
153-
} else {
154-
logger.warn(errMessage);
155-
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
156-
.getObTable().getPort(), response.getHeader().getTraceId1(), response
157-
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
126+
logger.debug("require_rerouting: {}, need_refresh_kv_meta: {}"
127+
, response.getHeader().isRoutingWrong(), response.getHeader().isNeedRefreshKvMeta());
128+
if (response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
129+
if (resultCode.getRcode() != 0) {
130+
String errMessage = TraceUtil.formatTraceMessage(conn, request,
131+
"meet exception: [error code:" + resultCode.getRcode() + "]"
132+
+ resultCode.getErrMsg());
133+
logger.debug(errMessage);
134+
if (needFetchMeta(resultCode.getRcode())) {
135+
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
136+
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
137+
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
138+
} else {
139+
logger.warn(errMessage);
140+
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
141+
.getObTable().getPort(), response.getHeader().getTraceId1(), response
142+
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
143+
}
144+
} else if (resultCode.getRcode() == 0 && response.getHeader().isRoutingWrong()) {
145+
// if distributed capability is supported and enabled
146+
// just need to refresh table entry, no need to retry
147+
String errMessage = TraceUtil.formatTraceMessage(conn, request,
148+
"meet exception and retry successfully in server: [require_rerouting :" + response.getHeader().isRoutingWrong()
149+
+ ", need_refresh_kv_meta :" + response.getHeader().isNeedRefreshKvMeta() + "]");
150+
logger.debug(errMessage);
151+
isRoutingWrong = true;
152+
if (response.getHeader().isNeedRefreshKvMeta()) {
153+
isNeedRefreshMeta = true;
154+
}
158155
}
159156
}
160157

@@ -165,6 +162,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
165162
.getHeader());
166163
payload.setSequence(response.getHeader().getTraceId1());
167164
payload.setUniqueId(response.getHeader().getTraceId0());
165+
payload.setIsRoutingWrong(isRoutingWrong);
166+
payload.setIsNeedRefreshMeta(isNeedRefreshMeta);
168167
} else {
169168
String errMessage = TraceUtil.formatTraceMessage(conn, response,
170169
"receive unexpected command code: " + response.getCmdCode().value());
@@ -192,7 +191,7 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
192191
}
193192

194193
// schema changed
195-
private boolean needFetchMeta(int errorCode, int pcode) {
194+
private boolean needFetchMeta(int errorCode) {
196195
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
197196
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
198197
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
@@ -202,8 +201,7 @@ private boolean needFetchMeta(int errorCode, int pcode) {
202201
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode
203202
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
204203
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
205-
|| errorCode == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode
206-
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
204+
|| errorCode == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode;
207205
}
208206

209207
private boolean needFetchPartitionLocation(int errorCode) {

src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ public static ObTableException convertToObTableException(String host, int port,
8686
+ "]" + "[" + resultCodes.name() + "]" + "["
8787
+ errMsg + "]" + "[" + server + "]" + "["
8888
+ trace + "]", resultCodes.errorCode);
89+
} else if (resultCodes.errorCode == OB_KV_SESS_NOT_EXIST.errorCode) {
90+
return new ObTableSessionNotExistException("[" + String.valueOf(resultCodes.errorCode)
91+
+ "]" + "[" + resultCodes.name() + "]" + "["
92+
+ errMsg + "]" + "[" + server + "]" + "["
93+
+ trace + "]", resultCodes.errorCode);
8994
} else {
9095
// [errCode][errCodeName][errMsg][server][trace]
9196
return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.alipay.oceanbase.rpc.exception;
2+
3+
public class ObTableSessionNotExistException extends ObTableException {
4+
/*
5+
* Ob table query session not in server exception.
6+
*/
7+
public ObTableSessionNotExistException() {
8+
}
9+
10+
/*
11+
* Ob table query session not in server exception.
12+
*/
13+
public ObTableSessionNotExistException(int errorCode) {
14+
super(errorCode);
15+
}
16+
17+
/*
18+
* Ob table query session not in server exception.
19+
*/
20+
public ObTableSessionNotExistException(String message, int errorCode) {
21+
super(message, errorCode);
22+
}
23+
24+
/*
25+
* Ob table query session not in server exception.
26+
*/
27+
public ObTableSessionNotExistException(String message) {
28+
super(message);
29+
}
30+
31+
/*
32+
* Ob table query session not in server exception.
33+
*/
34+
public ObTableSessionNotExistException(String message, Throwable cause) {
35+
super(message, cause);
36+
}
37+
38+
/*
39+
* Ob table query session not in server exception.
40+
*/
41+
public ObTableSessionNotExistException(Throwable cause) {
42+
super(cause);
43+
}
44+
45+
/*
46+
* Is need refresh table entry.
47+
*/
48+
public boolean isNeedRefreshTableEntry() {
49+
return false;
50+
}
51+
}

0 commit comments

Comments
 (0)