Skip to content

Commit 7765ad1

Browse files
committed
retry LSBatchImpl
1 parent 8bb9560 commit 7765ad1

File tree

1 file changed

+54
-10
lines changed

1 file changed

+54
-10
lines changed

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public void addOperation(Mutation mutation) throws Exception {
279279
* Execute.
280280
*/
281281
public List<Object> execute() throws Exception {
282-
List<Object> results = new ArrayList(batchOperation.size());
282+
List<Object> results = new ArrayList<>(batchOperation.size());
283283
for (ObTableSingleOpResult result : executeInternal()) {
284284
int errCode = result.getHeader().getErrno();
285285
if (errCode == ResultCodes.OB_SUCCESS.errorCode) {
@@ -321,10 +321,12 @@ private Object[] calculateRowKey(ObTableSingleOp operation) {
321321
return rowKey;
322322
}
323323

324-
private List<ObTableSingleOp> extractOperations(List<ObPair<Integer, ObTableSingleOp>> operationsPairs) throws Exception {
325-
List<ObTableSingleOp> operations = new ArrayList<>(operationsPairs.size());
326-
for (ObPair<Integer, ObTableSingleOp> pair : operationsPairs) {
327-
operations.add(pair.getRight());
324+
private List<ObTableSingleOp> extractOperations(Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>> tabletOperationsMap) {
325+
List<ObTableSingleOp> operations = new ArrayList<>();
326+
for (ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>> pair : tabletOperationsMap.values()) {
327+
for (ObPair<Integer, ObTableSingleOp> operationWithIndex : pair.getRight()) {
328+
operations.add(operationWithIndex.getRight());
329+
}
328330
}
329331
return operations;
330332
}
@@ -541,7 +543,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
541543
long endExecute = System.currentTimeMillis();
542544

543545
if (subLSOpResult == null) {
544-
RUNTIME.error("tablename:{} ls id:{} check batch operation result error: client get unexpected NULL result",
546+
RUNTIME.error("table name:{} ls id:{} check batch operation result error: client get unexpected NULL result",
545547
tableName, lsId);
546548
throw new ObTableUnexpectedException("check batch operation result error: client get unexpected NULL result");
547549
}
@@ -615,11 +617,51 @@ private boolean shouldRetry(Throwable throwable) {
615617
return throwable instanceof ObTableNeedFetchAllException;
616618
}
617619

620+
private void executeWithRetries(
621+
ObTableSingleOpResult[] results,
622+
Map.Entry<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> entry,
623+
int maxRetries) throws Exception {
624+
625+
int retryCount = 0;
626+
boolean success = false;
627+
628+
// 初始分区
629+
Map<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> currentPartitions = new HashMap<>();
630+
currentPartitions.put(entry.getKey(), entry.getValue());
631+
632+
while (retryCount < maxRetries && !success) {
633+
boolean allPartitionsSuccess = true;
634+
635+
for (Map.Entry<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> currentEntry : currentPartitions.entrySet()) {
636+
try {
637+
partitionExecute(results, currentEntry);
638+
} catch (Exception e) {
639+
if (shouldRetry(e)) {
640+
retryCount++;
641+
List<ObTableSingleOp> failedOperations = extractOperations(currentEntry.getValue());
642+
currentPartitions = prepareOperations(failedOperations);
643+
allPartitionsSuccess = false;
644+
break;
645+
} else {
646+
throw e;
647+
}
648+
}
649+
}
650+
651+
if (allPartitionsSuccess) {
652+
success = true;
653+
}
654+
}
655+
656+
if (!success) {
657+
throw new ObTableUnexpectedException("Failed to execute operation after retrying " + maxRetries + " times.");
658+
}
659+
}
660+
618661
/*
619662
* Execute internal.
620663
*/
621664
public ObTableSingleOpResult[] executeInternal() throws Exception {
622-
final int maxRetries = obTableClient.getRuntimeRetryTimes();
623665
if (tableName == null || tableName.isEmpty()) {
624666
throw new IllegalArgumentException("table name is null");
625667
}
@@ -633,6 +675,8 @@ public ObTableSingleOpResult[] executeInternal() throws Exception {
633675
Map<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> lsOperations = partitionPrepare();
634676
long getTableTime = System.currentTimeMillis();
635677
final Map<Object, Object> context = ThreadLocalMap.getContextMap();
678+
final int maxRetries = obTableClient.getRuntimeRetryTimes();
679+
636680
if (executorService != null && !executorService.isShutdown() && lsOperations.size() > 1) {
637681
// execute sub-batch operation in parallel
638682
final ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(executorService,
@@ -648,7 +692,7 @@ public ObTableSingleOpResult[] executeInternal() throws Exception {
648692
public void doTask() {
649693
try {
650694
ThreadLocalMap.transmitContextMap(context);
651-
partitionExecute(finalObTableOperationResults, entry);
695+
executeWithRetries(finalObTableOperationResults, entry, maxRetries);
652696
} catch (Exception e) {
653697
logger.error(LCD.convert("01-00026"), e);
654698
executor.collectExceptions(e);
@@ -669,7 +713,7 @@ public void doTask() {
669713
"Batch Concurrent Execute interrupted", e);
670714
}
671715

672-
if (executor.getThrowableList().size() > 0) {
716+
if (!executor.getThrowableList().isEmpty()) {
673717
throw new ObTableUnexpectedException("Batch Concurrent Execute Error",
674718
executor.getThrowableList().get(0));
675719
}
@@ -682,7 +726,7 @@ public void doTask() {
682726
executor.stop();
683727
}
684728

685-
if (executor.getThrowableList().size() > 0) {
729+
if (!executor.getThrowableList().isEmpty()) {
686730
throw new ObTableUnexpectedException("Batch Concurrent Execute Error", executor
687731
.getThrowableList().get(0));
688732
}

0 commit comments

Comments
 (0)