Skip to content

Commit d1c663f

Browse files
committed
add retry logic for common query
1 parent bec5329 commit d1c663f

File tree

4 files changed

+48
-37
lines changed

4 files changed

+48
-37
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import java.util.concurrent.atomic.AtomicReference;
4646
import java.util.concurrent.locks.ReentrantLock;
4747

48+
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;
49+
4850
public abstract class AbstractQueryStreamResult extends AbstractPayload implements
4951
QueryStreamResult {
5052

@@ -61,14 +63,15 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
6163
// global index: key is index table name (be like: __idx_<data_table_id>_<index_name>)
6264
protected String indexTableName;
6365
protected ObTableEntityType entityType;
64-
protected Map<Long, ObPair<Long, ObTableParam>> expectant; // Map<logicId, ObPair<logicId, param>>
66+
protected Map<Long, ObPair<Long, ObTableParam>> expectant;
6567
protected List<String> cacheProperties = new LinkedList<String>();
6668
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
6769
private LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>> partitionLastResult = new LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>>();
6870
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
6971
// ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT]
7072
public List<ObObj> currentStartKey;
71-
73+
protected ObTableClient client;
74+
7275
/*
7376
* Get pcode.
7477
*/
@@ -549,9 +552,32 @@ public void init() throws Exception {
549552
return;
550553
}
551554
if (tableQuery.getBatchSize() == -1) {
552-
for (Map.Entry<Long, ObPair<Long, ObTableParam>> entry : expectant.entrySet()) {
553-
// mark the refer partition
554-
referToNewPartition(entry.getValue());
555+
if (!expectant.isEmpty()) {
556+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
557+
.iterator();
558+
int retryTimes = 0;
559+
while (it.hasNext()) {
560+
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
561+
try {
562+
// try access new partition, async will not remove useless expectant
563+
referToNewPartition(entry.getValue());
564+
} catch (Exception e) {
565+
if (e instanceof ObTableNeedFetchAllException) {
566+
setExpectant(refreshPartition(tableQuery, tableName));
567+
it = expectant.entrySet().iterator();
568+
retryTimes++;
569+
if (retryTimes > client.getRuntimeRetryTimes()) {
570+
RUNTIME.error("Fail to get refresh table entry response after {}",
571+
retryTimes);
572+
throw new ObTableRetryExhaustedException(
573+
"Fail to get refresh table entry response after " + retryTimes);
574+
575+
}
576+
} else {
577+
throw e;
578+
}
579+
}
580+
}
555581
}
556582
expectant.clear();
557583
} else {
@@ -692,4 +718,19 @@ public ObReadConsistency getReadConsistency() {
692718
public void setReadConsistency(ObReadConsistency readConsistency) {
693719
this.readConsistency = readConsistency;
694720
}
721+
722+
/**
723+
* Get client.
724+
* @return client
725+
*/
726+
public ObTableClient getClient() {
727+
return client;
728+
}
729+
730+
/*
731+
* Set client.
732+
*/
733+
public void setClient(ObTableClient client) {
734+
this.client = client;
735+
}
695736
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult {
4545
private static final Logger logger = LoggerFactory
4646
.getLogger(ObTableClientQueryStreamResult.class);
47-
protected ObTableClient client;
4847
private boolean isEnd = true;
4948
private long sessionId = Constants.OB_INVALID_ID;
5049
private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest();
@@ -338,19 +337,7 @@ public void close() throws Exception {
338337
closeLastStreamResult(lastEntry.getValue());
339338
}
340339
}
341-
342-
public ObTableClient getClient() {
343-
return client;
344-
}
345-
346-
/**
347-
* Set client.
348-
* @param client client want to set
349-
*/
350-
public void setClient(ObTableClient client) {
351-
this.client = client;
352-
}
353-
340+
354341
public boolean isEnd() {
355342
return isEnd;
356343
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@
3434
import java.util.concurrent.atomic.AtomicReference;
3535

3636
public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult {
37-
3837
private static final Logger logger = TableClientLoggerFactory
3938
.getLogger(ObTableClientQueryStreamResult.class);
40-
protected ObTableClient client;
4139

4240
protected ObTableQueryResult referToNewPartition(ObPair<Long, ObTableParam> partIdWithObTable)
4341
throws Exception {
@@ -84,19 +82,4 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
8482
throws Exception {
8583
return buildPartitions(client, tableQuery, tableName);
8684
}
87-
88-
/**
89-
* Get client.
90-
* @return client
91-
*/
92-
public ObTableClient getClient() {
93-
return client;
94-
}
95-
96-
/*
97-
* Set client.
98-
*/
99-
public void setClient(ObTableClient client) {
100-
this.client = client;
101-
}
10285
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ private void executeWithRetries(
631631
Map<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> currentPartitions = new HashMap<>();
632632
currentPartitions.put(entry.getKey(), entry.getValue());
633633

634-
while (retryCount < maxRetries && !success) {
634+
while (retryCount <= maxRetries && !success) {
635635
boolean allPartitionsSuccess = true;
636636

637637
for (Map.Entry<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>>> currentEntry : currentPartitions.entrySet()) {

0 commit comments

Comments
 (0)