Skip to content

Commit 71a3e1a

Browse files
committed
Refactor: Rename overloaded getTable methods for clarity
1 parent 95ae3e4 commit 71a3e1a

File tree

6 files changed

+127
-65
lines changed

6 files changed

+127
-65
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 44 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import com.alipay.remoting.util.StringUtils;
4646
import org.slf4j.Logger;
4747

48-
import java.io.IOException;
4948
import java.util.*;
5049
import java.util.concurrent.ConcurrentHashMap;
5150
import java.util.concurrent.TimeUnit;
@@ -539,8 +538,8 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
539538
if (odpMode) {
540539
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
541540
} else {
542-
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry,
543-
tableEntryRefreshIntervalWait, route);
541+
obPair = getTableBySingleRowKeyWithRoute(tableName, callback.getRowKey(),
542+
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
544543
}
545544
T t = callback.execute(obPair);
546545
resetExecuteContinuousFailureCount(tableName);
@@ -696,12 +695,13 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
696695
} else {
697696
if (null != callback.getRowKey()) {
698697
// using row key
699-
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry,
700-
tableEntryRefreshIntervalWait, route);
698+
obPair = getTableBySingleRowKeyWithRoute(tableName, callback.getRowKey(),
699+
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
701700
} else if (null != callback.getKeyRanges()) {
702701
// using scan range
703-
obPair = getTable(tableName, new ObTableQuery(), callback.getKeyRanges(),
704-
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
702+
obPair = getTableByRowKeyRange(tableName, new ObTableQuery(),
703+
callback.getKeyRanges(), needRefreshTableEntry,
704+
tableEntryRefreshIntervalWait, route);
705705
} else {
706706
throw new ObTableException("rowkey and scan range are null in mutation");
707707
}
@@ -1040,19 +1040,6 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean fo
10401040
}
10411041
}
10421042

1043-
/**
1044-
* Get or refresh table entry.
1045-
* @param tableName table name
1046-
* @param refresh is re-fresh
1047-
* @param waitForRefresh wait re-fresh
1048-
* @return this
1049-
* @throws Exception if fail
1050-
*/
1051-
public TableEntry getOrRefreshTableEntry(final String tableName, final boolean refresh,
1052-
final boolean waitForRefresh) throws Exception {
1053-
return getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
1054-
}
1055-
10561043
/**
10571044
* Get or refresh table entry.
10581045
* @param tableName table name
@@ -1366,9 +1353,10 @@ private long getPartition(TableEntry tableEntry, Object[] rowKey) {
13661353
/*
13671354
* Get logicId(partition id in 3.x) from giving range
13681355
*/
1369-
private List<Long> getPartitionsForLevelTwo(TableEntry tableEntry, List<String> scanRangeColumns,
1370-
Object[] start, boolean startIncluded,
1371-
Object[] end, boolean endIncluded) throws Exception {
1356+
private List<Long> getPartitionsForLevelTwo(TableEntry tableEntry,
1357+
List<String> scanRangeColumns, Object[] start,
1358+
boolean startIncluded, Object[] end,
1359+
boolean endIncluded) throws Exception {
13721360
if (tableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_TWO) {
13731361
RUNTIME.error("getPartitionsForLevelTwo need ObPartitionLevel LEVEL_TWO");
13741362
throw new Exception("getPartitionsForLevelTwo need ObPartitionLevel LEVEL_TWO");
@@ -1450,9 +1438,11 @@ private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
14501438
* @return ObPair of partId and table
14511439
* @throws Exception exception
14521440
*/
1453-
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
1454-
boolean waitForRefresh) throws Exception {
1455-
return getTable(tableName, rowKey, refresh, waitForRefresh, getRoute(false));
1441+
public ObPair<Long, ObTableParam> getTableBySingleRowKey(String tableName, Object[] rowKey,
1442+
boolean refresh, boolean waitForRefresh)
1443+
throws Exception {
1444+
ObServerRoute route = getRoute(false);
1445+
return getTableBySingleRowKeyWithRoute(tableName, rowKey, refresh, waitForRefresh, route);
14561446
}
14571447

14581448
/**
@@ -1465,14 +1455,17 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
14651455
* @return ObPair of partId and table
14661456
* @throws Exception exception
14671457
*/
1468-
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
1469-
boolean waitForRefresh, ObServerRoute route)
1470-
throws Exception {
1471-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);
1458+
public ObPair<Long, ObTableParam> getTableBySingleRowKeyWithRoute(String tableName,
1459+
Object[] rowKey,
1460+
boolean refresh,
1461+
boolean waitForRefresh,
1462+
ObServerRoute route)
1463+
throws Exception {
1464+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
14721465

14731466
long partId = getPartition(tableEntry, rowKey); // partition id in 3.x, origin partId in 4.x, logicId
14741467

1475-
return getTable(tableName, tableEntry, partId, waitForRefresh, route);
1468+
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
14761469
}
14771470

14781471
/**
@@ -1485,8 +1478,8 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
14851478
* @return ObPair of partId and table
14861479
* @throws Exception exception
14871480
*/
1488-
public ObPair<Long, ObTableParam> getTable(String tableName, ObTableQuery query, List<ObNewRange> keyRanges, boolean refresh,
1489-
boolean waitForRefresh, ObServerRoute route)
1481+
public ObPair<Long, ObTableParam> getTableByRowKeyRange(String tableName, ObTableQuery query, List<ObNewRange> keyRanges, boolean refresh,
1482+
boolean waitForRefresh, ObServerRoute route)
14901483
throws Exception {
14911484
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
14921485
for (ObNewRange rang : keyRanges) {
@@ -1536,11 +1529,11 @@ public ObPair<Long, ObTableParam> getTable(String tableName, ObTableQuery query,
15361529
* @return ObPair of partId and table
15371530
* @throws Exception exception
15381531
*/
1539-
public ObPair<Long, ObTableParam> getTable(String tableName, long partId, boolean refresh,
1540-
boolean waitForRefresh, ObServerRoute route)
1541-
throws Exception {
1542-
return getTable(tableName, getOrRefreshTableEntry(tableName, refresh, waitForRefresh),
1543-
partId, waitForRefresh, route);
1532+
public ObPair<Long, ObTableParam> getTableWithPartId(String tableName, long partId,
1533+
boolean refresh, boolean waitForRefresh,
1534+
ObServerRoute route) throws Exception {
1535+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
1536+
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
15441537
}
15451538

15461539
/**
@@ -1553,9 +1546,9 @@ public ObPair<Long, ObTableParam> getTable(String tableName, long partId, boolea
15531546
* @return ObPair of partId and table
15541547
* @throws Exception exception
15551548
*/
1556-
public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEntry,
1557-
long partId, boolean waitForRefresh,
1558-
ObServerRoute route) throws Exception {
1549+
public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry tableEntry,
1550+
long partId, boolean waitForRefresh,
1551+
ObServerRoute route) throws Exception {
15591552
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partId,
15601553
route);
15611554

@@ -1573,7 +1566,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEnt
15731566
logger.info("server addr {} is expired, refresh tableEntry.", addr);
15741567
}
15751568

1576-
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh);
1569+
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
15771570
replica = getPartitionReplica(tableEntry, partId, route).getRight();
15781571
addr = replica.getAddr();
15791572
obTable = tableRoster.get(addr);
@@ -1660,10 +1653,11 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
16601653
* @return list of ObPair of partId(logicId) and table obTableParams
16611654
* @throws Exception exception
16621655
*/
1663-
public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery query, Object[] start,
1664-
boolean startInclusive, Object[] end,
1665-
boolean endInclusive, boolean refresh,
1666-
boolean waitForRefresh) throws Exception {
1656+
public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery query,
1657+
Object[] start, boolean startInclusive,
1658+
Object[] end, boolean endInclusive,
1659+
boolean refresh, boolean waitForRefresh)
1660+
throws Exception {
16671661
return getTables(tableName, query, start, startInclusive, end, endInclusive, refresh,
16681662
waitForRefresh, getRoute(false));
16691663
}
@@ -1688,7 +1682,7 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
16881682
throws Exception {
16891683

16901684
// 1. get TableEntry information
1691-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);
1685+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
16921686

16931687
List<String> scanRangeColumns = query.getScanRangeColumns();
16941688
if (scanRangeColumns == null || scanRangeColumns.size() == 0) {
@@ -1717,7 +1711,7 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
17171711
"server address {} is expired={} or can not get ob table. So that will sync refresh metadata",
17181712
addr, addrExpired);
17191713
syncRefreshMetadata();
1720-
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh);
1714+
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
17211715
replica = getPartitionLocation(tableEntry, partId, route);
17221716
addr = replica.getAddr();
17231717
obTable = tableRoster.get(addr);
@@ -3323,7 +3317,7 @@ public byte[][][] getFirstPartStartKeys(String tableName) throws Exception {
33233317
}
33243318

33253319
// Get the latest table entry
3326-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false);
3320+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false, false);
33273321

33283322
// Define start keys
33293323
byte[][][] firstPartStartKeys = new byte[0][][];
@@ -3391,7 +3385,7 @@ public byte[][][] getFirstPartEndKeys(String tableName) throws Exception {
33913385
}
33923386

33933387
// Get the latest table entry
3394-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false);
3388+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, true, false, false);
33953389

33963390
// Define end keys
33973391
byte[][][] firstPartEndKeys = new byte[0][][];

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
127127
"routed to the wrong server: " + response.getMessage());
128128
logger.warn(errMessage);
129129
if (needFetchAll(resultCode.getRcode())) {
130-
throw new ObTableRoutingWrongException(errMessage);
130+
throw new ObTableNeedFetchAllException(errMessage);
131131
} else {
132-
throw new ObTableMasterChangeException(errMessage);
132+
throw new ObTableRoutingWrongException(errMessage);
133133
}
134134
}
135135
if (resultCode.getRcode() != 0) {
@@ -178,7 +178,6 @@ private boolean needFetchAll(int errorCode) {
178178
|| errorCode == ResultCodes.OB_UNKNOWN_PARTITION.errorCode
179179
|| errorCode == ResultCodes.OB_PARTITION_NOT_MATCH.errorCode
180180
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
181-
|| errorCode == ResultCodes.OB_REPLICA_NOT_READABLE.errorCode
182-
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode;
181+
|| errorCode == ResultCodes.OB_REPLICA_NOT_READABLE.errorCode;
183182
}
184183
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.exception;
19+
20+
public class ObTableNeedFetchAllException extends ObTableException {
21+
/*
22+
* Ob table routing wrong exception.
23+
*/
24+
public ObTableNeedFetchAllException() {
25+
}
26+
27+
/*
28+
* Ob table routing wrong exception with error code.
29+
*/
30+
public ObTableNeedFetchAllException(int errorCode) {
31+
super(errorCode);
32+
}
33+
34+
/*
35+
* Ob table routing wrong exception with message and error code.
36+
*/
37+
public ObTableNeedFetchAllException(String message, int errorCode) {
38+
super(message, errorCode);
39+
}
40+
41+
/*
42+
* Ob table routing wrong exception with message.
43+
*/
44+
public ObTableNeedFetchAllException(String message) {
45+
super(message);
46+
}
47+
48+
/*
49+
* Ob table routing wrong exception with message and cause.
50+
*/
51+
public ObTableNeedFetchAllException(String message, Throwable cause) {
52+
super(message, cause);
53+
}
54+
55+
/*
56+
* Ob table routing wrong exception with cause.
57+
*/
58+
public ObTableNeedFetchAllException(Throwable cause) {
59+
super(cause);
60+
}
61+
62+
/*
63+
* Is need refresh table entry.
64+
*/
65+
public boolean isNeedRefreshTableEntry() {
66+
return true;
67+
}
68+
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
137137
route.setBlackList(failedServerList);
138138
}
139139
subObTable = client
140-
.getTable(indexTableName, partIdWithIndex.getLeft(),
140+
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
141141
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
142142
route).getRight().getObTable();
143143
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2424
import com.alipay.oceanbase.rpc.mutation.result.*;
2525
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
26-
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2726
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
2827
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
2928
import com.alipay.oceanbase.rpc.threadlocal.ThreadLocalMap;
@@ -244,8 +243,9 @@ public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>>
244243
for (int j = 0; j < rowKeySize; j++) {
245244
rowKey[j] = rowKeyObject.getObj(j).getValue();
246245
}
247-
ObPair<Long, ObTableParam> tableObPair = obTableClient.getTable(tableName, rowKey,
248-
false, false, obTableClient.getRoute(batchOperation.isReadOnly()));
246+
ObPair<Long, ObTableParam> tableObPair = obTableClient.getTableBySingleRowKeyWithRoute(
247+
tableName, rowKey, false, false,
248+
obTableClient.getRoute(batchOperation.isReadOnly()));
249249
ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>> obTableOperations = partitionOperationsMap
250250
.get(tableObPair.getLeft());
251251
if (obTableOperations == null) {
@@ -332,9 +332,9 @@ public void partitionExecute(ObTableOperationResult[] results,
332332
if (failedServerList != null) {
333333
route.setBlackList(failedServerList);
334334
}
335-
ObTableParam newParam = obTableClient
336-
.getTable(tableName, originPartId, needRefreshTableEntry,
337-
obTableClient.isTableEntryRefreshIntervalWait(), route).getRight();
335+
ObTableParam newParam = obTableClient.getTableWithPartId(tableName,
336+
originPartId, needRefreshTableEntry,
337+
obTableClient.isTableEntryRefreshIntervalWait(), route).getRight();
338338

339339
subObTable = newParam.getObTable();
340340
subRequest.setPartitionId(newParam.getPartitionId());
@@ -382,8 +382,9 @@ public void partitionExecute(ObTableOperationResult[] results,
382382
"tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}",
383383
tableName, partId, ((ObTableException) ex).getErrorCode(),
384384
tryTimes, ex);
385-
if (ex instanceof ObTableRoutingWrongException) {
386-
obTableClient.getOrRefreshTableEntry(tableName, true, obTableClient.isTableEntryRefreshIntervalWait(), true);
385+
if (ex instanceof ObTableNeedFetchAllException) {
386+
obTableClient.getOrRefreshTableEntry(tableName, true,
387+
obTableClient.isTableEntryRefreshIntervalWait(), true);
387388
// reset failure count while fetch all route info
388389
obTableClient.resetContinuousFailureByTableName(tableName);
389390
}

0 commit comments

Comments
 (0)