Skip to content

Commit 78ada50

Browse files
committed
[fix] multi-cf retry table not exist (#229)
1 parent c9fcdd2 commit 78ada50

File tree

4 files changed

+49
-28
lines changed

4 files changed

+49
-28
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3571,6 +3571,15 @@ public String getPhyTableNameFromTableGroup(ObTableQueryRequest request, String
35713571
return tableName;
35723572
}
35733573

3574+
public String getPhyTableNameFromTableGroup(ObTableEntityType type, String tableName) throws Exception {
3575+
if (odpMode) {
3576+
// do nothing
3577+
} else if (type == ObTableEntityType.HKV && isTableGroupName(tableName)) {
3578+
tableName = tryGetTableNameFromTableGroupCache(tableName, false);
3579+
}
3580+
return tableName;
3581+
}
3582+
35743583
/*
35753584
* Get the start keys of different tablets, byte[0] = [] = EMPTY_START_ROW = EMPTY_END_ROW
35763585
* Example:

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,14 @@ public void addTabletOperation(ObTableTabletOp tabletOp) {
121121
public void setLsOperation(ObTableLSOperation lsOperation) {
122122
this.lsOperation = lsOperation;
123123
}
124-
124+
125+
/*
126+
* Get entity type.
127+
*/
128+
public ObTableEntityType getEntityType() {
129+
return entityType;
130+
}
131+
125132
/*
126133
* Set entity type.
127134
*/

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package com.alipay.oceanbase.rpc.stream;
1919

20+
import com.alipay.oceanbase.rpc.ObGlobal;
2021
import com.alipay.oceanbase.rpc.ObTableClient;
2122
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2223
import com.alipay.oceanbase.rpc.exception.ObTableException;
2324
import com.alipay.oceanbase.rpc.exception.ObTableNeedFetchAllException;
2425
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
26+
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2527
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2628
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2729
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -80,14 +82,15 @@ public void init() throws Exception {
8082
} catch (Exception e) {
8183
if (e instanceof ObTableNeedFetchAllException) {
8284
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
83-
.getTableQuery(), tableName));
85+
.getTableQuery(), client.getPhyTableNameFromTableGroup(entityType, tableName)));
8486
it = expectant.entrySet().iterator();
8587
retryTimes++;
8688
if (retryTimes > maxRetries) {
8789
RUNTIME.error("Fail to get refresh table entry response after {}",
8890
retryTimes);
8991
throw new ObTableRetryExhaustedException(
90-
"Fail to get refresh table entry response after " + retryTimes);
92+
"Fail to get refresh table entry response after " + retryTimes
93+
+ "errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode());
9194

9295
}
9396
} else {
@@ -201,10 +204,11 @@ public boolean next() throws Exception {
201204
referToLastStreamResult(lastEntry.getValue());
202205
} catch (Exception e) {
203206
if (e instanceof ObTableNeedFetchAllException) {
207+
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
204208
this.asyncRequest.getObTableQueryRequest().getTableQuery()
205209
.adjustStartKey(currentStartKey);
206210
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
207-
.getTableQuery(), tableName));
211+
.getTableQuery(), realTableName));
208212
setEnd(true);
209213
} else {
210214
throw e;
@@ -232,10 +236,11 @@ public boolean next() throws Exception {
232236
referToNewPartition(entry.getValue());
233237
} catch (Exception e) {
234238
if (e instanceof ObTableNeedFetchAllException) {
239+
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
235240
this.asyncRequest.getObTableQueryRequest().getTableQuery()
236241
.adjustStartKey(currentStartKey);
237242
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
238-
.getTableQuery(), tableName));
243+
.getTableQuery(), realTableName));
239244
it = expectant.entrySet().iterator();
240245
retryTimes++;
241246
if (retryTimes > client.getTableEntryRefreshTryTimes()) {

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -360,18 +360,17 @@ public Map<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSing
360360
false, false, obTableClient.getRoute(false));
361361
long lsId = tableObPair.getRight().getLsId();
362362

363-
Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>> tabletOperations
363+
Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>> tabletOperations
364364
= lsOperationsMap.computeIfAbsent(lsId, k -> new HashMap<>());
365-
// if ls id not exists
365+
// if ls id not exists
366366

367-
ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>> singleOperations =
367+
ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>> singleOperations =
368368
tabletOperations.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(tableObPair.getRight(), new ArrayList<>()));
369-
// if tablet id not exists
370-
371-
singleOperations.getRight().add(new ObPair<>(i, operation));
372-
}
369+
// if tablet id not exists
370+
singleOperations.getRight().add(new ObPair<>(i, operation));
371+
}
373372

374-
return lsOperationsMap;
373+
return lsOperationsMap;
375374
}
376375

377376
public Map<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> partitionPrepare()
@@ -447,7 +446,8 @@ public void partitionExecute(ObTableSingleOpResult[] results,
447446
long startExecute = System.currentTimeMillis();
448447
Set<String> failedServerList = null;
449448
ObServerRoute route = null;
450-
449+
// maybe get real table name
450+
String realTableName = obTableClient.getPhyTableNameFromTableGroup(tableLsOpRequest.getEntityType(), tableName);
451451
while (true) {
452452
obTableClient.checkStatus();
453453
long currentExecute = System.currentTimeMillis();
@@ -456,7 +456,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
456456
logger.error("table name: {} ls id:{} it has tried " + tryTimes
457457
+ " times and it has waited " + costMillis + " ms"
458458
+ " which exceeds runtime max wait timeout "
459-
+ obTableClient.getRuntimeMaxWait() + " ms", tableName, lsId);
459+
+ obTableClient.getRuntimeMaxWait() + " ms", realTableName, lsId);
460460
throw new ObTableTimeoutExcetion("it has tried " + tryTimes
461461
+ " times and it has waited " + costMillis
462462
+ "ms which exceeds runtime max wait timeout "
@@ -474,15 +474,15 @@ public void partitionExecute(ObTableSingleOpResult[] results,
474474
if (failedServerList != null) {
475475
route.setBlackList(failedServerList);
476476
}
477-
subObTable = obTableClient.getTableWithPartId(tableName, originPartId, needRefreshTableEntry,
477+
subObTable = obTableClient.getTableWithPartId(realTableName, originPartId, needRefreshTableEntry,
478478
obTableClient.isTableEntryRefreshIntervalWait(), false, route).
479479
getRight().getObTable();
480480
}
481481
}
482482
ObPayload result = subObTable.execute(tableLsOpRequest);
483483
if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) {
484484
ObTableApiMove moveResponse = (ObTableApiMove) result;
485-
obTableClient.getRouteTableRefresher().addTableIfAbsent(tableName, true);
485+
obTableClient.getRouteTableRefresher().addTableIfAbsent(realTableName, true);
486486
obTableClient.getRouteTableRefresher().triggerRefreshTable();
487487
subObTable = obTableClient.getTable(moveResponse);
488488
result = subObTable.execute(tableLsOpRequest);
@@ -494,17 +494,17 @@ public void partitionExecute(ObTableSingleOpResult[] results,
494494
}
495495
}
496496
subLSOpResult = (ObTableLSOpResult) result;
497-
obTableClient.resetExecuteContinuousFailureCount(tableName);
497+
obTableClient.resetExecuteContinuousFailureCount(realTableName);
498498
break;
499499
} catch (Exception ex) {
500500
if (obTableClient.isOdpMode()) {
501501
logger.warn("meet exception when execute ls batch in odp mode." +
502-
"tablename: {}, errMsg: {}", tableName, ex.getMessage());
502+
"tablename: {}, errMsg: {}", realTableName, ex.getMessage());
503503
throw ex;
504504
} else if (ex instanceof ObTableReplicaNotReadableException) {
505505
if ((tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) {
506506
logger.warn("tablename:{} ls id:{} retry when replica not readable: {}",
507-
tableName, lsId, ex.getMessage());
507+
realTableName, lsId, ex.getMessage());
508508
if (failedServerList == null) {
509509
failedServerList = new HashSet<String>();
510510
}
@@ -517,23 +517,23 @@ public void partitionExecute(ObTableSingleOpResult[] results,
517517
&& ((ObTableException) ex).isNeedRefreshTableEntry()) {
518518
needRefreshTableEntry = true;
519519
logger.warn("tablename:{} ls id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}",
520-
tableName, lsId, ((ObTableException) ex).getErrorCode(), ex);
520+
realTableName, lsId, ((ObTableException) ex).getErrorCode(), ex);
521521
if (obTableClient.isRetryOnChangeMasterTimes()
522522
&& (tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) {
523523
logger.warn("tablename:{} ls id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}",
524-
tableName, lsId, ((ObTableException) ex).getErrorCode(),
524+
realTableName, lsId, ((ObTableException) ex).getErrorCode(),
525525
tryTimes, ex);
526526
if (ex instanceof ObTableNeedFetchAllException) {
527-
obTableClient.getOrRefreshTableEntry(tableName, needRefreshTableEntry,
527+
obTableClient.getOrRefreshTableEntry(realTableName, needRefreshTableEntry,
528528
obTableClient.isTableEntryRefreshIntervalWait(), true);
529529
throw ex;
530530
}
531531
} else {
532-
obTableClient.calculateContinuousFailure(tableName, ex.getMessage());
532+
obTableClient.calculateContinuousFailure(realTableName, ex.getMessage());
533533
throw ex;
534534
}
535535
} else {
536-
obTableClient.calculateContinuousFailure(tableName, ex.getMessage());
536+
obTableClient.calculateContinuousFailure(realTableName, ex.getMessage());
537537
throw ex;
538538
}
539539
}
@@ -544,7 +544,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
544544

545545
if (subLSOpResult == null) {
546546
RUNTIME.error("table name:{} ls id:{} check batch operation result error: client get unexpected NULL result",
547-
tableName, lsId);
547+
realTableName, lsId);
548548
throw new ObTableUnexpectedException("check batch operation result error: client get unexpected NULL result");
549549
}
550550

@@ -607,7 +607,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
607607

608608

609609
String endpoint = subObTable.getIp() + ":" + subObTable.getPort();
610-
MonitorUtil.info(tableLsOpRequest, subObTable.getDatabase(), tableName,
610+
MonitorUtil.info(tableLsOpRequest, subObTable.getDatabase(), realTableName,
611611
"LS_BATCH-Execute-", endpoint, tableLsOp,
612612
affectedRows, endExecute - startExecute,
613613
obTableClient.getslowQueryMonitorThreshold());
@@ -745,7 +745,7 @@ public void doTask() {
745745
// Execute sub-batch operation one by one
746746
for (final Map.Entry<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> entry : lsOperations
747747
.entrySet()) {
748-
partitionExecute(obTableOperationResults, entry);
748+
executeWithRetries(obTableOperationResults, entry, maxRetries);
749749
}
750750
}
751751

0 commit comments

Comments
 (0)