Skip to content

Commit 30fa10e

Browse files
authored
Merge pull request #153 from maochongxin/fix_truncate_table_error
Fix: truncate table param error && faster refreshing of the routing table for specific error code
2 parents 45203ea + 4331c69 commit 30fa10e

File tree

10 files changed

+187
-80
lines changed

10 files changed

+187
-80
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 45 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import com.alipay.remoting.util.StringUtils;
4646
import org.slf4j.Logger;
4747

48-
import java.io.IOException;
4948
import java.util.*;
5049
import java.util.concurrent.ConcurrentHashMap;
5150
import java.util.concurrent.TimeUnit;
@@ -539,8 +538,8 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
539538
if (odpMode) {
540539
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
541540
} else {
542-
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry,
543-
tableEntryRefreshIntervalWait, route);
541+
obPair = getTableBySingleRowKeyWithRoute(tableName, callback.getRowKey(),
542+
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
544543
}
545544
T t = callback.execute(obPair);
546545
resetExecuteContinuousFailureCount(tableName);
@@ -696,12 +695,13 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
696695
} else {
697696
if (null != callback.getRowKey()) {
698697
// using row key
699-
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry,
700-
tableEntryRefreshIntervalWait, route);
698+
obPair = getTableBySingleRowKeyWithRoute(tableName, callback.getRowKey(),
699+
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
701700
} else if (null != callback.getKeyRanges()) {
702701
// using scan range
703-
obPair = getTable(tableName, new ObTableQuery(), callback.getKeyRanges(),
704-
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
702+
obPair = getTableByRowKeyRange(tableName, new ObTableQuery(),
703+
callback.getKeyRanges(), needRefreshTableEntry,
704+
tableEntryRefreshIntervalWait, route);
705705
} else {
706706
throw new ObTableException("rowkey and scan range are null in mutation");
707707
}
@@ -788,6 +788,7 @@ public void calculateContinuousFailure(String tableName, String errorMsg) throws
788788
}
789789
}
790790

791+
791792
/**
792793
* Reset execute continuous failure count.
793794
* @param tableName table name
@@ -1033,19 +1034,6 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean fo
10331034
}
10341035
}
10351036

1036-
/**
1037-
* Get or refresh table entry.
1038-
* @param tableName table name
1039-
* @param refresh is re-fresh
1040-
* @param waitForRefresh wait re-fresh
1041-
* @return this
1042-
* @throws Exception if fail
1043-
*/
1044-
public TableEntry getOrRefreshTableEntry(final String tableName, final boolean refresh,
1045-
final boolean waitForRefresh) throws Exception {
1046-
return getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
1047-
}
1048-
10491037
/**
10501038
* Get or refresh table entry.
10511039
* @param tableName table name
@@ -1359,9 +1347,10 @@ private long getPartition(TableEntry tableEntry, Object[] rowKey) {
13591347
/*
13601348
* Get logicId(partition id in 3.x) from giving range
13611349
*/
1362-
private List<Long> getPartitionsForLevelTwo(TableEntry tableEntry, List<String> scanRangeColumns,
1363-
Object[] start, boolean startIncluded,
1364-
Object[] end, boolean endIncluded) throws Exception {
1350+
private List<Long> getPartitionsForLevelTwo(TableEntry tableEntry,
1351+
List<String> scanRangeColumns, Object[] start,
1352+
boolean startIncluded, Object[] end,
1353+
boolean endIncluded) throws Exception {
13651354
if (tableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_TWO) {
13661355
RUNTIME.error("getPartitionsForLevelTwo need ObPartitionLevel LEVEL_TWO");
13671356
throw new Exception("getPartitionsForLevelTwo need ObPartitionLevel LEVEL_TWO");
@@ -1443,9 +1432,11 @@ private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
14431432
* @return ObPair of partId and table
14441433
* @throws Exception exception
14451434
*/
1446-
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
1447-
boolean waitForRefresh) throws Exception {
1448-
return getTable(tableName, rowKey, refresh, waitForRefresh, getRoute(false));
1435+
public ObPair<Long, ObTableParam> getTableBySingleRowKey(String tableName, Object[] rowKey,
1436+
boolean refresh, boolean waitForRefresh)
1437+
throws Exception {
1438+
ObServerRoute route = getRoute(false);
1439+
return getTableBySingleRowKeyWithRoute(tableName, rowKey, refresh, waitForRefresh, route);
14491440
}
14501441

14511442
/**
@@ -1458,14 +1449,17 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
14581449
* @return ObPair of partId and table
14591450
* @throws Exception exception
14601451
*/
1461-
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
1462-
boolean waitForRefresh, ObServerRoute route)
1463-
throws Exception {
1464-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);
1452+
public ObPair<Long, ObTableParam> getTableBySingleRowKeyWithRoute(String tableName,
1453+
Object[] rowKey,
1454+
boolean refresh,
1455+
boolean waitForRefresh,
1456+
ObServerRoute route)
1457+
throws Exception {
1458+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
14651459

14661460
long partId = getPartition(tableEntry, rowKey); // partition id in 3.x, origin partId in 4.x, logicId
14671461

1468-
return getTable(tableName, tableEntry, partId, waitForRefresh, route);
1462+
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
14691463
}
14701464

14711465
/**
@@ -1478,8 +1472,8 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
14781472
* @return ObPair of partId and table
14791473
* @throws Exception exception
14801474
*/
1481-
public ObPair<Long, ObTableParam> getTable(String tableName, ObTableQuery query, List<ObNewRange> keyRanges, boolean refresh,
1482-
boolean waitForRefresh, ObServerRoute route)
1475+
public ObPair<Long, ObTableParam> getTableByRowKeyRange(String tableName, ObTableQuery query, List<ObNewRange> keyRanges, boolean refresh,
1476+
boolean waitForRefresh, ObServerRoute route)
14831477
throws Exception {
14841478
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
14851479
for (ObNewRange rang : keyRanges) {
@@ -1529,11 +1523,11 @@ public ObPair<Long, ObTableParam> getTable(String tableName, ObTableQuery query,
15291523
* @return ObPair of partId and table
15301524
* @throws Exception exception
15311525
*/
1532-
public ObPair<Long, ObTableParam> getTable(String tableName, long partId, boolean refresh,
1533-
boolean waitForRefresh, ObServerRoute route)
1534-
throws Exception {
1535-
return getTable(tableName, getOrRefreshTableEntry(tableName, refresh, waitForRefresh),
1536-
partId, waitForRefresh, route);
1526+
public ObPair<Long, ObTableParam> getTableWithPartId(String tableName, long partId,
1527+
boolean refresh, boolean waitForRefresh, boolean needFetchAll,
1528+
ObServerRoute route) throws Exception {
1529+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, needFetchAll);
1530+
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
15371531
}
15381532

15391533
/**
@@ -1546,9 +1540,9 @@ public ObPair<Long, ObTableParam> getTable(String tableName, long partId, boolea
15461540
* @return ObPair of partId and table
15471541
* @throws Exception exception
15481542
*/
1549-
public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEntry,
1550-
long partId, boolean waitForRefresh,
1551-
ObServerRoute route) throws Exception {
1543+
public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry tableEntry,
1544+
long partId, boolean waitForRefresh,
1545+
ObServerRoute route) throws Exception {
15521546
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partId,
15531547
route);
15541548

@@ -1566,7 +1560,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEnt
15661560
logger.info("server addr {} is expired, refresh tableEntry.", addr);
15671561
}
15681562

1569-
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh);
1563+
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
15701564
replica = getPartitionReplica(tableEntry, partId, route).getRight();
15711565
addr = replica.getAddr();
15721566
obTable = tableRoster.get(addr);
@@ -1653,10 +1647,11 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
16531647
* @return list of ObPair of partId(logicId) and table obTableParams
16541648
* @throws Exception exception
16551649
*/
1656-
public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery query, Object[] start,
1657-
boolean startInclusive, Object[] end,
1658-
boolean endInclusive, boolean refresh,
1659-
boolean waitForRefresh) throws Exception {
1650+
public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery query,
1651+
Object[] start, boolean startInclusive,
1652+
Object[] end, boolean endInclusive,
1653+
boolean refresh, boolean waitForRefresh)
1654+
throws Exception {
16601655
return getTables(tableName, query, start, startInclusive, end, endInclusive, refresh,
16611656
waitForRefresh, getRoute(false));
16621657
}
@@ -1681,7 +1676,7 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
16811676
throws Exception {
16821677

16831678
// 1. get TableEntry information
1684-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);
1679+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
16851680

16861681
List<String> scanRangeColumns = query.getScanRangeColumns();
16871682
if (scanRangeColumns == null || scanRangeColumns.size() == 0) {
@@ -1710,7 +1705,7 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
17101705
"server address {} is expired={} or can not get ob table. So that will sync refresh metadata",
17111706
addr, addrExpired);
17121707
syncRefreshMetadata();
1713-
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh);
1708+
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
17141709
replica = getPartitionLocation(tableEntry, partId, route);
17151710
addr = replica.getAddr();
17161711
obTable = tableRoster.get(addr);
@@ -3316,7 +3311,7 @@ public byte[][][] getFirstPartStartKeys(String tableName) throws Exception {
33163311
}
33173312

33183313
// Get the latest table entry
3319-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false);
3314+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false, false);
33203315

33213316
// Define start keys
33223317
byte[][][] firstPartStartKeys = new byte[0][][];
@@ -3384,7 +3379,7 @@ public byte[][][] getFirstPartEndKeys(String tableName) throws Exception {
33843379
}
33853380

33863381
// Get the latest table entry
3387-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false);
3382+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false, false);
33883383

33893384
// Define end keys
33903385
byte[][][] firstPartEndKeys = new byte[0][][];

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import com.alipay.oceanbase.rpc.bolt.protocol.ObTablePacketCode;
2222
import com.alipay.oceanbase.rpc.exception.*;
2323
import com.alipay.oceanbase.rpc.protocol.packet.ObCompressType;
24-
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
25-
import com.alipay.oceanbase.rpc.protocol.payload.Credentialable;
26-
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
27-
import com.alipay.oceanbase.rpc.protocol.payload.ObRpcResultCode;
24+
import com.alipay.oceanbase.rpc.protocol.payload.*;
2825
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginRequest;
2926
import com.alipay.oceanbase.rpc.util.ObPureCrc32C;
3027
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
@@ -108,14 +105,6 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
108105
throw new FeatureNotSupportedException(errMessage);
109106
}
110107
ByteBuf buf = response.getPacketContentBuf();
111-
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
112-
if (response.getHeader().isRoutingWrong()) {
113-
String errMessage = TraceUtil.formatTraceMessage(conn, request,
114-
"routed to the wrong server: " + response.getMessage());
115-
logger.warn(errMessage);
116-
throw new ObTableRoutingWrongException(errMessage);
117-
}
118-
119108
// verify checksum
120109
long expected_checksum = response.getHeader().getChecksum();
121110
byte[] content = new byte[buf.readableBytes()];
@@ -132,7 +121,24 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
132121
// decode ResultCode for response packet
133122
ObRpcResultCode resultCode = new ObRpcResultCode();
134123
resultCode.decode(buf);
135-
124+
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125+
if (response.getHeader().isRoutingWrong()) {
126+
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127+
"routed to the wrong server: " + response.getMessage());
128+
logger.warn(errMessage);
129+
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130+
throw new ObTableNeedFetchAllException(errMessage);
131+
} else if (needFetchPartial(resultCode.getRcode())) {
132+
throw new ObTableRoutingWrongException(errMessage);
133+
} else {
134+
// Encountered an unexpected RoutingWrong error code,
135+
// possibly due to the client error code version being behind the observer's version.
136+
// Attempting a full refresh here
137+
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
138+
logger.warn("get unexpected error code: {}", response.getMessage());
139+
throw new ObTableNeedFetchAllException(errMessage);
140+
}
141+
}
136142
if (resultCode.getRcode() != 0) {
137143
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn.getObTable()
138144
.getPort(), response.getHeader().getTraceId1(), response.getHeader()
@@ -173,4 +179,29 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
173179
return new ObClientFuture(request.getId());
174180
}
175181

182+
// schema changed
183+
private boolean needFetchAll(int errorCode, int pcode) {
184+
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
185+
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
186+
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
187+
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
188+
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
189+
}
190+
private boolean needFetchPartial(int errorCode) {
191+
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
192+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
193+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
194+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
195+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
196+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
197+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
198+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
199+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
200+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
201+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
202+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
203+
|| errorCode == ResultCodes.OB_TENANT_NOT_IN_SERVER.errorCode
204+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
205+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
206+
}
176207
}

0 commit comments

Comments
 (0)