Skip to content

Commit f1401b5

Browse files
authored
Add rerouting error feedback refreshing logic (#358)
* refresh table meta and locatino based on flag, refresh cache after retry successfully in server * remove lsop + OB_NOT_MASTER * change debug log to warn * make execute and operation time longer * add debug log * revert log level and operation timeout
1 parent fb6aa87 commit f1401b5

File tree

9 files changed

+123
-48
lines changed

9 files changed

+123
-48
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ public void calculateContinuousFailure(String tableName, String errorMsg) throws
837837
if (failures.incrementAndGet() > runtimeContinuousFailureCeiling) {
838838
logger.warn("refresh table entry {} while execute failed times exceeded {}, msg: {}",
839839
tableName, runtimeContinuousFailureCeiling, errorMsg);
840-
refreshMeta(tableName);
840+
tableRoute.refreshMeta(tableName);
841841
failures.set(0);
842842
} else {
843843
logger.warn("error msg: {}, current continues failure count: {}", errorMsg, failures);
@@ -968,14 +968,6 @@ public TableEntry getOrRefreshTableEntry(final String tableName, boolean forceRe
968968
if (!forceRefresh) {
969969
return tableRoute.getTableEntry(tableName);
970970
}
971-
return refreshMeta(tableName);
972-
}
973-
974-
/**
975-
* refresh table meta information except location
976-
* @param tableName table name
977-
* */
978-
private TableEntry refreshMeta(String tableName) throws Exception {
979971
return tableRoute.refreshMeta(tableName);
980972
}
981973

@@ -2102,6 +2094,16 @@ public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tab
21022094
"Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString());
21032095
throw new ObTableRoutingWrongException();
21042096
}
2097+
} else if (result != null && result.isRoutingWrong()) {
2098+
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
2099+
obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta());
2100+
if (result.isNeedRefreshMeta()) {
2101+
tableRoute.refreshMeta(tableName);
2102+
}
2103+
if (request instanceof ObTableAbstractOperationRequest) {
2104+
long tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
2105+
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
2106+
}
21052107
}
21062108
return result;
21072109
}
@@ -2365,7 +2367,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23652367
needRefreshTabletLocation = true;
23662368
if (ex instanceof ObTableNeedFetchMetaException) {
23672369
// Refresh table info
2368-
refreshMeta(request.getTableName());
2370+
tableRoute.refreshMeta(request.getTableName());
23692371
}
23702372
}
23712373
} else {

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -119,42 +119,39 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
119119
}
120120

121121
// decode ResultCode for response packet
122+
boolean isRoutingWrong = false;
123+
boolean isNeedRefreshMeta = false;
122124
ObRpcResultCode resultCode = new ObRpcResultCode();
123125
resultCode.decode(buf);
124-
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125-
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126-
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127-
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
128-
+ resultCode.getErrMsg());
129-
logger.debug(errMessage);
130-
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
131-
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
132-
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
133-
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
134-
} else {
135-
// Encountered an unexpected RoutingWrong error code,
136-
// possibly due to the client error code version being behind the observer's version.
137-
// Attempting a full refresh here
138-
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
139-
logger.warn("get unexpected error code: {}", errMessage);
140-
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
141-
}
142-
}
143-
if (resultCode.getRcode() != 0
144-
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
145-
String errMessage = TraceUtil.formatTraceMessage(conn, request,
146-
"meet exception: [error code:" + resultCode.getRcode() + "]"
147-
+ resultCode.getErrMsg());
148-
logger.debug(errMessage);
149-
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
150-
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
151-
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
152-
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
153-
} else {
154-
logger.warn(errMessage);
155-
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
156-
.getObTable().getPort(), response.getHeader().getTraceId1(), response
157-
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
126+
logger.debug("require_rerouting_: {}, need_refresh_kv_meta_: {}"
127+
, response.getHeader().isRoutingWrong(), response.getHeader().isNeedRefreshKvMeta());
128+
if (response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
129+
if (resultCode.getRcode() != 0) {
130+
String errMessage = TraceUtil.formatTraceMessage(conn, request,
131+
"meet exception: [error code:" + resultCode.getRcode() + "]"
132+
+ resultCode.getErrMsg());
133+
logger.debug(errMessage);
134+
if (needFetchMeta(resultCode.getRcode())) {
135+
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
136+
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
137+
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
138+
} else {
139+
logger.warn(errMessage);
140+
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
141+
.getObTable().getPort(), response.getHeader().getTraceId1(), response
142+
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
143+
}
144+
} else if (resultCode.getRcode() == 0 && response.getHeader().isRoutingWrong()) {
145+
// if distributed capability is supported and enabled
146+
// just need to refresh table entry, no need to retry
147+
String errMessage = TraceUtil.formatTraceMessage(conn, request,
148+
"meet exception and retry successfully in server: [require_rerouting :" + response.getHeader().isRoutingWrong()
149+
+ ", need_refresh_kv_meta :" + response.getHeader().isNeedRefreshKvMeta() + "]");
150+
logger.debug(errMessage);
151+
isRoutingWrong = true;
152+
if (response.getHeader().isNeedRefreshKvMeta()) {
153+
isNeedRefreshMeta = true;
154+
}
158155
}
159156
}
160157

@@ -165,6 +162,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
165162
.getHeader());
166163
payload.setSequence(response.getHeader().getTraceId1());
167164
payload.setUniqueId(response.getHeader().getTraceId0());
165+
payload.setIsRoutingWrong(isRoutingWrong);
166+
payload.setIsNeedRefreshMeta(isNeedRefreshMeta);
168167
} else {
169168
String errMessage = TraceUtil.formatTraceMessage(conn, response,
170169
"receive unexpected command code: " + response.getCmdCode().value());
@@ -192,7 +191,7 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
192191
}
193192

194193
// schema changed
195-
private boolean needFetchMeta(int errorCode, int pcode) {
194+
private boolean needFetchMeta(int errorCode) {
196195
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
197196
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
198197
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
@@ -202,8 +201,7 @@ private boolean needFetchMeta(int errorCode, int pcode) {
202201
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode
203202
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
204203
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
205-
|| errorCode == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode
206-
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
204+
|| errorCode == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode;
207205
}
208206

209207
private boolean needFetchPartitionLocation(int errorCode) {

src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ private void validCachedObTableStatus(String tableName, TableEntry tableEntry, l
662662
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
663663
ObServerAddr addr = replica.getAddr();
664664
ObTable obTable = tableRoster.getTable(addr);
665-
if (obTable != null) {
665+
if (obTable != null && !obTable.isValid()) {
666666
obTable.setValid();
667667
}
668668
}

src/main/java/com/alipay/oceanbase/rpc/protocol/packet/ObRpcPacketHeader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,14 @@ public void setRoutingWrong() {
296296
flag |= REQUIRE_REROUTING_FLAG;
297297
}
298298

299+
/*
300+
* need to refresh kv table meta
301+
* */
302+
public boolean isNeedRefreshKvMeta() {
303+
return (flag & IS_KV_REQUEST_FALG) != 0;
304+
}
305+
306+
299307
/*
300308
* Set stream next.
301309
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public abstract class AbstractPayload implements ObPayload {
4242
private long uniqueId;
4343
private long sequence;
4444
private Integer channelId = null;
45+
private boolean isRoutingWrong = false; // flag means tableEntry location or meta need to be refreshed
46+
private boolean isNeedRefreshMeta = false; // flag means tableEntry meta need to be refreshed
4547
protected long tenantId = 1;
4648
private long version = 1;
4749
protected long timeout = RPC_OPERATION_TIMEOUT.getDefaultLong();
@@ -66,6 +68,38 @@ public long getTimeout() {
6668
return timeout;
6769
}
6870

71+
/*
72+
* Get isRoutingWrong
73+
* */
74+
@Override
75+
public boolean isRoutingWrong() {
76+
return this.isRoutingWrong;
77+
}
78+
79+
/*
80+
* Set isRoutingWrong
81+
* */
82+
@Override
83+
public void setIsRoutingWrong(boolean isRoutingWrong) {
84+
this.isRoutingWrong = isRoutingWrong;
85+
}
86+
87+
/*
88+
* Get isNeedRefreshMeta
89+
* */
90+
@Override
91+
public boolean isNeedRefreshMeta() {
92+
return this.isNeedRefreshMeta;
93+
}
94+
95+
/*
96+
* Set isNeedRefreshMeta
97+
* */
98+
@Override
99+
public void setIsNeedRefreshMeta(boolean isNeedRefreshMeta) {
100+
this.isNeedRefreshMeta = isNeedRefreshMeta;
101+
}
102+
69103
/*
70104
* Get version.
71105
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ObPayload.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ public interface ObPayload extends ObUnisVersion {
4141
*/
4242
long getTimeout();
4343

44+
boolean isRoutingWrong();
45+
46+
void setIsRoutingWrong(boolean isRoutingWrong);
47+
48+
boolean isNeedRefreshMeta();
49+
50+
void setIsNeedRefreshMeta(boolean isNeedRefreshMeta);
51+
4452
/*
4553
* set sequence
4654
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,14 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
174174
.getServer().ipToString());
175175
throw new ObTableRoutingWrongException();
176176
}
177+
} else if (result != null && result.isRoutingWrong()) {
178+
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
179+
subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta());
180+
TableEntry tableEntry = result.isNeedRefreshMeta() ?
181+
client.getOrRefreshTableEntry(indexTableName, true) :
182+
client.getOrRefreshTableEntry(indexTableName, false);
183+
long tabletId = client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft());
184+
client.refreshTableLocationByTabletId(indexTableName, tabletId);
177185
}
178186
}
179187
client.resetExecuteContinuousFailureCount(indexTableName);

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,14 @@ public void partitionExecute(ObTableOperationResult[] results,
378378
.ipToString());
379379
throw new ObTableRoutingWrongException();
380380
}
381+
} else if (result != null && result.isRoutingWrong()) {
382+
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
383+
subObTable.getIp(), subObTable.getPort(), tableName, result.isNeedRefreshMeta());
384+
TableEntry entry = result.isNeedRefreshMeta() ?
385+
obTableClient.getOrRefreshTableEntry(tableName, true) :
386+
obTableClient.getOrRefreshTableEntry(tableName, false);
387+
long tabletId = obTableClient.getTabletIdByPartId(entry, originPartId);
388+
obTableClient.refreshTableLocationByTabletId(tableName, tabletId);
381389
}
382390
subObTableBatchOperationResult = (ObTableBatchOperationResult) result;
383391
obTableClient.resetExecuteContinuousFailureCount(tableName);

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,15 @@ public void partitionExecute(ObTableSingleOpResult[] results,
645645
"Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move.getReplica().getServer().ipToString());
646646
throw new ObTableRoutingWrongException();
647647
}
648+
} else if (result != null && result.isRoutingWrong()) {
649+
// retry successfully in server and need to refresh client cache
650+
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
651+
subObTable.getIp(), subObTable.getPort(), realTableName, result.isNeedRefreshMeta());
652+
if (result.isNeedRefreshMeta()) {
653+
obTableClient.getOrRefreshTableEntry(realTableName, true);
654+
}
655+
// TODO: 如果是不需要全部刷新地址的错误,全部刷新地址会降低效率。如何确定出错的 tablet_id 并刷新?
656+
obTableClient.refreshTabletLocationBatch(realTableName);
648657
}
649658
subLSOpResult = (ObTableLSOpResult) result;
650659
obTableClient.resetExecuteContinuousFailureCount(realTableName);

0 commit comments

Comments
 (0)