Skip to content

Commit 8490fee

Browse files
committed
fix batch retry
1 parent 29c367a commit 8490fee

File tree

4 files changed

+176
-105
lines changed

4 files changed

+176
-105
lines changed

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObRangePartDesc.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public ObRangePartDesc() {
6666
public List<ObObjType> getOrderedCompareColumnTypes() {
6767
return orderedCompareColumnTypes;
6868
}
69-
private List<Long> completeWorks;
69+
70+
private List<Long> completeWorks;
7071

7172
/*
7273
* Set ordered compare column types.
@@ -299,20 +300,20 @@ public int getBoundsIdx(boolean isScan, Row rowKey) {
299300
try {
300301
List<Object> evalParams = evalRowKeyValues(rowKey);
301302
List<Comparable> comparableElement = super.initComparableElementByTypes(evalParams,
302-
this.orderedCompareColumns);
303+
this.orderedCompareColumns);
303304
ObPartitionKey searchKey = ObPartitionKey.getInstance(orderedCompareColumns,
304-
comparableElement);
305+
comparableElement);
305306

306307
int pos = upperBound(this.bounds, new ObComparableKV<ObPartitionKey, Long>(searchKey,
307-
(long) -1));
308+
(long) -1));
308309
if (pos >= this.bounds.size()) {
309310
if (isScan) {
310311
// if range is bigger than rangeMax while scanning
311312
// we just scan until last range
312313
return this.bounds.size() - 1;
313314
}
314315
throw new ArrayIndexOutOfBoundsException("Table has no partition for value in "
315-
+ this.getPartExpr());
316+
+ this.getPartExpr());
316317
} else {
317318
return pos;
318319
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 118 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -217,25 +217,41 @@ public List<Object> executeWithResult() throws Exception {
217217
return results;
218218
}
219219

220-
public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> partitionPrepare()
221-
throws Exception {
222-
// consistent can not be sure
223-
List<ObTableOperation> operations = batchOperation.getTableOperations();
224-
Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> partitionOperationsMap = new HashMap<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>>();
220+
// Helper method to calculate RowKey from ObTableOperation
221+
private Object[] calculateRowKey(ObTableOperation operation) {
222+
ObRowKey rowKeyObject = operation.getEntity().getRowKey();
223+
int rowKeySize = rowKeyObject.getObjs().size();
224+
Object[] rowKey = new Object[rowKeySize];
225+
for (int j = 0; j < rowKeySize; j++) {
226+
rowKey[j] = rowKeyObject.getObj(j).getValue();
227+
}
228+
return rowKey;
229+
}
225230

231+
public List<ObTableOperation> extractOperations(List<ObPair<Integer, ObTableOperation>> operationsPairs) {
232+
List<ObTableOperation> operations = new ArrayList<>(operationsPairs.size());
233+
for (ObPair<Integer, ObTableOperation> pair : operationsPairs) {
234+
operations.add(pair.getRight());
235+
}
236+
return operations;
237+
}
238+
239+
public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> prepareOperations(List<ObTableOperation> operations) throws Exception {
240+
Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> partitionOperationsMap = new HashMap<>();
241+
226242
if (obTableClient.isOdpMode()) {
227-
ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>> obTableOperations = new ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>(
228-
new ObTableParam(obTableClient.getOdpTable()),
229-
new ArrayList<ObPair<Integer, ObTableOperation>>());
243+
ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>> obTableOperations = new ObPair<>(
244+
new ObTableParam(obTableClient.getOdpTable()),
245+
new ArrayList<>());
230246
for (int i = 0; i < operations.size(); i++) {
231247
ObTableOperation operation = operations.get(i);
232248
obTableOperations.getRight().add(
233-
new ObPair<Integer, ObTableOperation>(i, operation));
249+
new ObPair<>(i, operation));
234250
}
235251
partitionOperationsMap.put(0L, obTableOperations);
236252
return partitionOperationsMap;
237253
}
238-
254+
239255
for (int i = 0; i < operations.size(); i++) {
240256
ObTableOperation operation = operations.get(i);
241257
ObRowKey rowKeyObject = operation.getEntity().getRowKey();
@@ -248,18 +264,20 @@ public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>>
248264
tableName, rowKey, false, false,
249265
obTableClient.getRoute(batchOperation.isReadOnly()));
250266
ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>> obTableOperations = partitionOperationsMap
251-
.get(tableObPair.getLeft());
252-
if (obTableOperations == null) {
253-
obTableOperations = new ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>(
254-
tableObPair.getRight(), new ArrayList<ObPair<Integer, ObTableOperation>>());
255-
partitionOperationsMap.put(tableObPair.getLeft(), obTableOperations);
256-
}
257-
obTableOperations.getRight().add(new ObPair<Integer, ObTableOperation>(i, operation));
267+
.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(
268+
tableObPair.getRight(), new ArrayList<>()));
269+
obTableOperations.getRight().add(new ObPair<>(i, operation));
258270
}
259-
260271
return partitionOperationsMap;
261272
}
262273

274+
public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> partitionPrepare()
275+
throws Exception {
276+
// consistent can not be sure
277+
List<ObTableOperation> operations = batchOperation.getTableOperations();
278+
return prepareOperations(operations);
279+
}
280+
263281
/*
264282
* Partition execute.
265283
*/
@@ -363,6 +381,7 @@ public void partitionExecute(ObTableOperationResult[] results,
363381
} catch (Exception ex) {
364382
if (obTableClient.isOdpMode()) {
365383
if ((tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) {
384+
assert ex instanceof ObTableException;
366385
logger
367386
.warn(
368387
"batch ops execute while meet Exception, tablename:{}, errorCode: {} , errorMsg: {}, try times {}",
@@ -399,9 +418,9 @@ public void partitionExecute(ObTableOperationResult[] results,
399418
tableName, partId, ((ObTableException) ex).getErrorCode(),
400419
tryTimes, ex);
401420
if (ex instanceof ObTableNeedFetchAllException) {
402-
needFetchAllRouteInfo = true;
403-
// reset failure count while fetch all route info
404-
obTableClient.resetExecuteContinuousFailureCount(tableName);
421+
obTableClient.getOrRefreshTableEntry(tableName, needRefreshTableEntry,
422+
obTableClient.isTableEntryRefreshIntervalWait(), true);
423+
throw ex;
405424
}
406425
} else {
407426
obTableClient.calculateContinuousFailure(tableName, ex.getMessage());
@@ -483,50 +502,96 @@ public void partitionExecute(ObTableOperationResult[] results,
483502
obTableClient.getslowQueryMonitorThreshold());
484503
}
485504

486-
/*
487-
* Execute internal.
488-
*/
489-
public ObTableBatchOperationResult executeInternal() throws Exception {
505+
private boolean shouldRetry(Throwable throwable) {
506+
return throwable instanceof ObTableNeedFetchAllException;
507+
}
490508

509+
private void executeWithRetries(ObTableOperationResult[] results, Map.Entry<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> entry, int maxRetries) throws Exception {
510+
int retryCount = 0;
511+
boolean success = false;
512+
513+
// 初始分区
514+
Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> currentPartitions = new HashMap<>();
515+
currentPartitions.put(entry.getKey(), entry.getValue());
516+
517+
while (retryCount < maxRetries && !success) {
518+
boolean allPartitionsSuccess = true;
519+
520+
for (Map.Entry<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> currentEntry : currentPartitions.entrySet()) {
521+
try {
522+
partitionExecute(results, currentEntry);
523+
} catch (Exception e) {
524+
if (shouldRetry(e)) {
525+
retryCount++;
526+
List<ObTableOperation> failedOperations = extractOperations(currentEntry.getValue().getRight());
527+
currentPartitions = prepareOperations(failedOperations);
528+
System.out.println("refresh partitions: currentPartitions: " + currentPartitions);
529+
allPartitionsSuccess = false;
530+
break;
531+
} else {
532+
throw e;
533+
}
534+
}
535+
}
536+
537+
if (allPartitionsSuccess) {
538+
success = true;
539+
}
540+
}
541+
542+
if (!success) {
543+
throw new ObTableUnexpectedException("Failed to execute operation after retrying " + maxRetries + " times.");
544+
}
545+
}
546+
547+
public ObTableBatchOperationResult executeInternal() throws Exception {
491548
if (tableName == null || tableName.isEmpty()) {
492549
throw new IllegalArgumentException("table name is null");
493550
}
494551
long start = System.currentTimeMillis();
495552
List<ObTableOperation> operations = batchOperation.getTableOperations();
496-
ObTableOperationResult[] obTableOperationResults = null;
497-
if (returnOneResult) {
498-
obTableOperationResults = new ObTableOperationResult[1];
499-
} else {
500-
obTableOperationResults = new ObTableOperationResult[operations.size()];
501-
}
553+
ObTableOperationResult[] obTableOperationResults = returnOneResult ? new ObTableOperationResult[1]
554+
: new ObTableOperationResult[operations.size()];
502555

503556
Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> partitions = partitionPrepare();
504557
long getTableTime = System.currentTimeMillis();
505558
final Map<Object, Object> context = ThreadLocalMap.getContextMap();
559+
final int maxRetries = obTableClient.getRuntimeRetryTimes();
560+
561+
ConcurrentTaskExecutor executor = null;
506562
if (executorService != null && !executorService.isShutdown() && partitions.size() > 1) {
507-
final ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(executorService,
508-
partitions.size());
509-
for (final Map.Entry<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> entry : partitions
510-
.entrySet()) {
511-
ObTableOperationResult[] finalObTableOperationResults = obTableOperationResults;
512-
executor.execute(new ConcurrentTask() {
513-
/*
514-
* Do task.
515-
*/
516-
@Override
517-
public void doTask() {
518-
try {
519-
ThreadLocalMap.transmitContextMap(context);
520-
partitionExecute(finalObTableOperationResults, entry);
521-
} catch (Exception e) {
522-
logger.error(LCD.convert("01-00026"), e);
523-
executor.collectExceptions(e);
524-
} finally {
525-
ThreadLocalMap.reset();
563+
executor = new ConcurrentTaskExecutor(executorService, partitions.size());
564+
}
565+
for (final Map.Entry<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> entry : partitions
566+
.entrySet()) {
567+
try {
568+
if (executor != null) {
569+
// Concurrent execution
570+
ConcurrentTaskExecutor finalExecutor = executor;
571+
executor.execute(new ConcurrentTask() {
572+
@Override
573+
public void doTask() {
574+
try {
575+
ThreadLocalMap.transmitContextMap(context);
576+
executeWithRetries(obTableOperationResults, entry, maxRetries);
577+
} catch (Exception e) {
578+
logger.error(LCD.convert("01-00026"), e);
579+
finalExecutor.collectExceptions(e);
580+
} finally {
581+
ThreadLocalMap.reset();
582+
}
526583
}
527-
}
528-
});
584+
});
585+
} else {
586+
// Sequential execution
587+
executeWithRetries(obTableOperationResults, entry, maxRetries);
588+
}
589+
} catch (Exception e) {
590+
logger.error("Error executing retry: {}", entry.getKey(), e);
591+
throw e;
529592
}
593+
}
594+
if (executor != null) {
530595
long estimate = obTableClient.getRuntimeBatchMaxWait() * 1000L * 1000L;
531596
try {
532597
while (estimate > 0) {
@@ -538,7 +603,7 @@ public void doTask() {
538603
"Batch Concurrent Execute interrupted", e);
539604
}
540605

541-
if (executor.getThrowableList().size() > 0) {
606+
if (!executor.getThrowableList().isEmpty()) {
542607
throw new ObTableUnexpectedException("Batch Concurrent Execute Error",
543608
executor.getThrowableList().get(0));
544609
}
@@ -553,22 +618,11 @@ public void doTask() {
553618
executor.stop();
554619
}
555620

556-
if (executor.getThrowableList().size() > 0) {
557-
throw new ObTableUnexpectedException("Batch Concurrent Execute Error", executor
558-
.getThrowableList().get(0));
559-
}
560-
561621
if (!executor.isComplete()) {
562622
throw new ObTableUnexpectedException("Batch Concurrent Execute Error ["
563623
+ obTableClient.getRpcExecuteTimeout()
564624
+ "]/ms");
565625
}
566-
567-
} else {
568-
for (final Map.Entry<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>> entry : partitions
569-
.entrySet()) {
570-
partitionExecute(obTableOperationResults, entry);
571-
}
572626
}
573627

574628
ObTableBatchOperationResult batchOperationResult = new ObTableBatchOperationResult();

0 commit comments

Comments
 (0)