Skip to content

Commit 642ad45

Browse files
authored
Fix compatibility problem caused by getPartition in ODP mode (#273)
* revert odp operations to old client version, do not send tablet_id to odp * retry one time in odp mode when query with part id * correct holdTime when fetchAll is true when refreshing tableEntry * adapt master after rebase * remove useless assert
1 parent 088b828 commit 642ad45

File tree

5 files changed

+68
-116
lines changed

5 files changed

+68
-116
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 23 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import static com.alipay.oceanbase.rpc.location.model.TableEntry.HBASE_ROW_KEY_ELEMENT;
6363
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartIdCalculator.*;
6464
import static com.alipay.oceanbase.rpc.property.Property.*;
65-
import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE;
6665
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
6766
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
6867
import static java.lang.String.format;
@@ -618,7 +617,6 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
618617
throw new IllegalArgumentException("table name is null");
619618
}
620619
boolean needRefreshTableEntry = false;
621-
boolean needRenew = false;
622620
boolean needFetchAllRouteInfo = false;
623621
int tryTimes = 0;
624622
long startExecute = System.currentTimeMillis();
@@ -636,7 +634,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
636634
ObPair<Long, ObTableParam> obPair = null;
637635
try {
638636
if (odpMode) {
639-
obPair = getODPTableWithRowKeyValue(tableName, callback.getRowKey(), needRenew);
637+
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
640638
} else {
641639
obPair = getTable(tableName, callback.getRowKey(),
642640
needRefreshTableEntry, tableEntryRefreshIntervalWait,
@@ -654,17 +652,9 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
654652
"execute while meet Exception, errorCode: {} , errorMsg: {}, try times {}",
655653
((ObTableException) ex).getErrorCode(), ex.getMessage(),
656654
tryTimes);
657-
// if the cause is that ODP partition meta have expired, try to fetch new one
658-
if (ex instanceof ObTablePartitionChangeException
659-
&& ((ObTablePartitionChangeException) ex).getErrorCode() == OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
660-
needRenew = true;
661-
} else {
662-
throw ex;
663-
}
664655
} else {
665656
logger.warn("execute while meet Exception, errorMsg: {}, try times {}",
666657
ex.getMessage(), tryTimes);
667-
throw ex;
668658
}
669659
} else {
670660
RUNTIME.error("retry failed with exception", ex);
@@ -686,7 +676,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
686676
} else if (ex instanceof ObTableException
687677
&& ((ObTableException) ex).isNeedRefreshTableEntry()) {
688678
needRefreshTableEntry = true;
689-
679+
690680
if (retryOnChangeMasterTimes && (tryTimes - 1) < runtimeRetryTimes) {
691681
if (ex instanceof ObTableNeedFetchAllException) {
692682
needFetchAllRouteInfo = true;
@@ -787,7 +777,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
787777
throw new IllegalArgumentException("table name is null");
788778
}
789779
boolean needRefreshTableEntry = false;
790-
boolean needRenew = false;
780+
boolean needFetchAllRouteInfo = false;
791781
int tryTimes = 0;
792782
long startExecute = System.currentTimeMillis();
793783
while (true) {
@@ -804,7 +794,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
804794
ObPair<Long, ObTableParam> obPair = null;
805795
try {
806796
if (odpMode) {
807-
obPair = getODPTableWithRowKey(tableName, callback.getRowKey(), needRenew);
797+
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
808798
} else {
809799
if (null != callback.getRowKey()) {
810800
// in the case of retry, the location always needs to be refreshed here
@@ -837,20 +827,10 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
837827
"execute while meet Exception, errorCode: {} , errorMsg: {}, try times {}",
838828
((ObTableException) ex).getErrorCode(), ex.getMessage(),
839829
tryTimes);
840-
// if the cause is that ODP partition meta have expired, try to fetch new one
841-
if (ex instanceof ObTablePartitionChangeException
842-
&& ((ObTablePartitionChangeException) ex).getErrorCode() == OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
843-
needRenew = true;
844-
} else {
845-
RUNTIME.error("execute while meet exception", ex);
846-
throw ex;
847-
}
848830
} else {
849831
logger.warn(
850-
"execute while meet Exception, exception: {}, try times {}", ex,
851-
tryTimes);
852-
RUNTIME.error("execute while meet exception", ex);
853-
throw ex;
832+
"execute while meet Exception, exception: {}, try times {}", ex,
833+
tryTimes);
854834
}
855835
} else {
856836
RUNTIME.error("retry failed with exception", ex);
@@ -870,7 +850,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
870850
}
871851
} else if (ex instanceof ObTableException
872852
&& ((ObTableException) ex).isNeedRefreshTableEntry()) {
873-
// if the problem is the lack of row key name, throw directly
853+
// if the problem is the lack of row key name, throw directly
874854
if (tableRowKeyElement.get(tableName) == null) {
875855
logger.warn("tableRowKeyElement not found table name: {}", ex.getMessage());
876856
RUNTIME.error("tableRowKeyElement not found table name", ex);
@@ -880,7 +860,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
880860
if (retryOnChangeMasterTimes && (tryTimes - 1) < runtimeRetryTimes) {
881861
if (ex instanceof ObTableNeedFetchAllException) {
882862
getOrRefreshTableEntry(tableName, true, true, true);
883-
// reset failure count while fetch all route info
863+
// reset failure count while fetch all route info
884864
this.resetExecuteContinuousFailureCount(tableName);
885865
}
886866
} else {
@@ -1232,7 +1212,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
12321212
if ((fetchAll && (fetchAllInterval < punishInterval))
12331213
|| (!fetchAll && (interval < punishInterval))) {
12341214
if (waitForRefresh) {
1235-
long toHoldTime = punishInterval - interval;
1215+
long toHoldTime = fetchAll ? (punishInterval - fetchAllInterval) : (punishInterval - interval);
12361216
logger
12371217
.info(
12381218
"punish table entry {} : table entry refresh time {} punish interval {} current time {}. wait for refresh times {}ms",
@@ -1374,13 +1354,13 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
13741354
if (info == null) {
13751355
throw new ObTableEntryRefreshException("Partition info is null for tabletId=" + tabletId);
13761356
}
1377-
1357+
13781358
long lastRefreshTime = info.getLastUpdateTime();
13791359
long currentTime = System.currentTimeMillis();
13801360
if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) {
13811361
return tableEntry;
13821362
}
1383-
1363+
13841364
Lock lock = info.refreshLock;
13851365
boolean acquired = false;
13861366
try {
@@ -1400,7 +1380,7 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
14001380
if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) {
14011381
return tableEntry;
14021382
}
1403-
1383+
14041384
tableEntry = loadTableEntryLocationWithPriority(
14051385
serverRoster,
14061386
tableEntryKey,
@@ -1412,7 +1392,7 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
14121392
serverAddressCachingTimeout,
14131393
sysUA
14141394
);
1415-
1395+
14161396
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
14171397

14181398
} finally {
@@ -1430,7 +1410,7 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
14301410
RUNTIME.error(LCD.convert("01-00020"), tableEntryKey, tableEntry, e);
14311411
throw new ObTableEntryRefreshException(errorMsg, e);
14321412
}
1433-
1413+
14341414
tableLocations.put(tableName, tableEntry);
14351415
tableEntryRefreshContinuousFailureCount.set(0);
14361416
return tableEntry;
@@ -1655,7 +1635,7 @@ private ObPair<Long, ReplicaLocation> getPartitionReplica(TableEntry tableEntry,
16551635
private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
16561636
ObServerRoute route) {
16571637
// In all cases for 3.x and for non-partitioned tables in 4.x, partId will not change.
1658-
// If it is 4.x, it will be converted to tablet id.
1638+
// If it is 4.x, it will be converted to tablet id.
16591639
partId = getTabletIdByPartId(tableEntry, partId);
16601640
return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(partId)
16611641
.getReplica(route);
@@ -1980,9 +1960,9 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
19801960
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
19811961
replica = getPartitionLocation(obPartitionLocationInfo, route);
19821962
/**
1983-
* Normally, getOrRefreshPartitionInfo makes sure that a thread only continues if it finds the leader
1984-
* during a route refresh. But sometimes, there might not be a leader yet. In this case, the thread
1985-
* is released, and since it can't get the replica, it throws an no master exception.
1963+
* Normally, getOrRefreshPartitionInfo makes sure that a thread only continues if it finds the leader
1964+
* during a route refresh. But sometimes, there might not be a leader yet. In this case, the thread
1965+
* is released, and since it can't get the replica, it throws an no master exception.
19861966
*/
19871967
if (replica == null && obPartitionLocationInfo.getPartitionLocation().getLeader() == null) {
19881968
RUNTIME.error(LCD.convert("01-00028"), partitionId, tableEntry.getPartitionEntry(), tableEntry);
@@ -2023,14 +2003,14 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
20232003
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
20242004
}
20252005
}
2026-
2006+
20272007
if (ObGlobal.obVsnMajor() >= 4) {
20282008
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
20292009
replica = getPartitionLocation(obPartitionLocationInfo, route);
20302010
} else {
20312011
replica = getPartitionReplica(tableEntry, partitionId, route).getRight();
20322012
}
2033-
2013+
20342014
addr = replica.getAddr();
20352015
obTable = tableRoster.get(addr);
20362016

@@ -2334,8 +2314,9 @@ private List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuer
23342314
ObTableParam param = new ObTableParam(obTable);
23352315
param.setPartId(partId);
23362316
partId = getTabletIdByPartId(tableEntry, partId);
2337-
param.setLsId(tableEntry.getPartitionEntry().getPartitionInfo(partId).getTabletLsId());
2338-
2317+
if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
2318+
param.setLsId(tableEntry.getPartitionEntry().getPartitionInfo(partId).getTabletLsId());
2319+
}
23392320
param.setTableId(tableEntry.getTableId());
23402321
// real partition(tablet) id
23412322
param.setPartitionId(partId);

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
116116
ObPayload result;
117117
ObTable subObTable = partIdWithIndex.getRight().getObTable();
118118
boolean needRefreshTableEntry = false;
119-
boolean odpNeedRenew = false;
120119
int tryTimes = 0;
121120
long startExecute = System.currentTimeMillis();
122121
Set<String> failedServerList = null;
@@ -138,9 +137,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
138137
try {
139138
if (tryTimes > 1) {
140139
if (client.isOdpMode()) {
141-
subObTable = client
142-
.getODPTableWithPartId(tableName, partIdWithIndex.getLeft(),
143-
odpNeedRenew).getRight().getObTable();
140+
subObTable = client.getOdpTable();
144141
} else {
145142
if (route == null) {
146143
route = client.getReadRoute();
@@ -192,24 +189,16 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
192189
"tablename:{} stream query execute while meet Exception needing retry, errorCode: {}, errorMsg: {}, try times {}",
193190
indexTableName, ((ObTableException) e).getErrorCode(),
194191
e.getMessage(), tryTimes);
195-
if (e instanceof ObTablePartitionChangeException
196-
&& ((ObTablePartitionChangeException) e).getErrorCode() == ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
197-
odpNeedRenew = true;
198-
} else {
199-
throw e;
200-
}
201192
} else if (e instanceof IllegalArgumentException) {
202193
logger
203194
.warn(
204195
"tablename:{} stream query execute while meet Exception needing retry, try times {}, errorMsg: {}",
205196
indexTableName, tryTimes, e.getMessage());
206-
throw e;
207197
} else {
208198
logger
209199
.warn(
210200
"tablename:{} stream query execute while meet Exception needing retry, try times {}",
211201
indexTableName, tryTimes, e);
212-
throw e;
213202
}
214203
} else {
215204
throw e;

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -262,20 +262,12 @@ public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>>
262262
for (int j = 0; j < rowKeySize; j++) {
263263
rowKey[j] = rowKeyObject.getObj(j).getValue();
264264
}
265-
ObPair<Long, ObTableParam> tableObPair = null;
266-
if (!obTableClient.isOdpMode()) {
267-
tableObPair = obTableClient.getTable(tableName, rowKey,
268-
false, false, obTableClient.getRoute(batchOperation.isReadOnly()));
269-
} else {
270-
tableObPair = obTableClient.getODPTableWithRowKeyValue(tableName, rowKey, false);
271-
}
272-
if (tableObPair == null) {
273-
throw new ObTableUnexpectedException("fail to get table pair in batch");
274-
}
275-
final ObPair<Long, ObTableParam> tmpTableObPair = tableObPair;
265+
ObPair<Long, ObTableParam> tableObPair = obTableClient.getTable(
266+
tableName, rowKey, false, false,
267+
obTableClient.getRoute(batchOperation.isReadOnly()));
276268
ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>> obTableOperations = partitionOperationsMap
277-
.computeIfAbsent(tmpTableObPair.getLeft(), k -> new ObPair<>(
278-
tmpTableObPair.getRight(), new ArrayList<>()));
269+
.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(
270+
tableObPair.getRight(), new ArrayList<>()));
279271
obTableOperations.getRight().add(new ObPair<>(i, operation));
280272
}
281273
return partitionOperationsMap;
@@ -351,16 +343,12 @@ public void partitionExecute(ObTableOperationResult[] results,
351343
}
352344
tryTimes++;
353345
try {
354-
if (tryTimes > 1) {
355-
if (obTableClient.isOdpMode()) {
356-
ObTableParam newParam = obTableClient.getODPTableWithPartId(tableName,
357-
originPartId, odpNeedRenew).getRight();
358-
subObTable = newParam.getObTable();
359-
subRequest.setPartitionId(newParam.getPartitionId());
360-
subRequest.setTableId(newParam.getTableId());
361-
} else {
362-
// getTable() when we need retry
363-
// we should use partIdx to get table
346+
if (obTableClient.isOdpMode()) {
347+
subObTable = obTableClient.getOdpTable();
348+
} else {
349+
// getTable() when we need retry
350+
// we should use partIdx to get table
351+
if (tryTimes > 1) {
364352
if (route == null) {
365353
route = obTableClient.getRoute(batchOperation.isReadOnly());
366354
}
@@ -408,12 +396,7 @@ public void partitionExecute(ObTableOperationResult[] results,
408396
"batch ops execute while meet Exception, tablename:{}, errorMsg: {}, try times {}",
409397
tableName, ex.getMessage(),
410398
tryTimes);
411-
if (ex instanceof ObTablePartitionChangeException
412-
&& ((ObTablePartitionChangeException) ex).getErrorCode() == ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
413-
odpNeedRenew = true;
414-
}
415399
} else {
416-
RUNTIME.error("retry fail when normal batch executing", ex);
417400
throw ex;
418401
}
419402
} else if (ex instanceof ObTableReplicaNotReadableException) {

0 commit comments

Comments
 (0)