Skip to content

Commit f0de20d

Browse files
committed
fix(scan): prevent infinite retries on illegal partitions during splits
1 parent e2649e0 commit f0de20d

File tree

6 files changed

+174
-72
lines changed

6 files changed

+174
-72
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
6565
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
6666
private LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>> partitionLastResult = new LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>>();
6767
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
68-
public ObRowKey currentStartKey;
68+
public List<ObObj> currentStartKey;
6969

7070
/*
7171
* Get pcode.
@@ -302,7 +302,8 @@ public boolean next() throws Exception {
302302
// lastly, refer to the new partition
303303
boolean hasNext = false;
304304
List<Map.Entry<Long, ObPair<Long, ObTableParam>>> referPartition = new ArrayList<Map.Entry<Long, ObPair<Long, ObTableParam>>>();
305-
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet().iterator();
305+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
306+
.iterator();
306307
while (it.hasNext()) {
307308
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
308309
referPartition.add(entry);
@@ -311,7 +312,8 @@ public boolean next() throws Exception {
311312
referPartition.add(entry);
312313

313314
// Try accessing the new partition
314-
ObTableQueryResult tableQueryResult = (ObTableQueryResult) referToNewPartition(entry.getValue());
315+
ObTableQueryResult tableQueryResult = (ObTableQueryResult) referToNewPartition(entry
316+
.getValue());
315317

316318
if (tableQueryResult.getRowCount() == 0) {
317319
continue;
@@ -329,13 +331,13 @@ public boolean next() throws Exception {
329331

330332
// Reset the iterator to start over
331333
it = expectant.entrySet().iterator();
332-
referPartition.clear(); // Clear the referPartition if needed
334+
referPartition.clear(); // Clear the referPartition if needed
333335
} else {
334336
throw e;
335337
}
336338
}
337339
}
338-
340+
339341
for (Map.Entry<Long, ObPair<Long, ObTableParam>> entry : expectant.entrySet()) {
340342
// mark the refer partition
341343
referPartition.add(entry);
@@ -411,7 +413,7 @@ protected void nextRow() {
411413
rowIndex = rowIndex + 1;
412414
row = cacheRows.poll();
413415
if (row != null) {
414-
currentStartKey = (ObRowKey) row.get(0).getValue();
416+
currentStartKey = row;
415417
}
416418
}
417419

@@ -490,7 +492,10 @@ protected abstract ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdW
490492
protected abstract ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partIdWithObTable,
491493
ObPayload streamRequest)
492494
throws Exception;
493-
protected abstract Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception;
495+
496+
protected abstract Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
497+
String tableName)
498+
throws Exception;
494499

495500
protected void cacheResultRows(ObTableQueryResult tableQueryResult) {
496501
cacheRows.addAll(tableQueryResult.getPropertiesRows());

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 20 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import com.alipay.oceanbase.rpc.util.Serialization;
2929
import io.netty.buffer.ByteBuf;
3030

31+
import java.util.Arrays;
3132
import java.util.LinkedList;
3233
import java.util.List;
3334

35+
import static com.alipay.oceanbase.rpc.util.ByteUtil.*;
3436
import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader;
3537
import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength;
3638

@@ -70,86 +72,44 @@ public class ObTableQuery extends AbstractPayload {
7072
private List<ObTableAggregationSingle> aggregations = new LinkedList<>();
7173

7274
private ObKVParams obKVParams;
73-
74-
public void adjustStartKey(ObRowKey key) throws IllegalArgumentException {
75+
76+
public void adjustStartKey(List<ObObj> key) throws IllegalArgumentException {
7577
List<ObNewRange> keyRanges = getKeyRanges();
7678
for (ObNewRange range : keyRanges) {
7779
if (isKeyInRange(range, key)) {
7880
byte[] bytes = parseStartKeyToBytes(key);
79-
ObRowKey newStartKey = ObRowKey.getInstance(new Object[]{incrementByteArray(bytes), ObObj.getMin(), ObObj.getMin()});
80-
range.setStartKey(newStartKey);
81+
if (getScanOrder() == ObScanOrder.Forward) {
82+
ObRowKey newStartKey = ObRowKey.getInstance(new Object[]{incrementByteArray(bytes), ObObj.getMin(), ObObj.getMin()});
83+
range.setStartKey(newStartKey);
84+
} else {
85+
ObRowKey newStartKey = ObRowKey.getInstance(new Object[]{decrementByteArray(bytes), ObObj.getMax(), ObObj.getMax()});
86+
range.setEndKey(newStartKey);
87+
}
8188
return;
8289
}
8390
}
8491
throw new IllegalArgumentException("Key not found in any KeyRange.");
8592
}
8693

87-
private byte[] parseStartKeyToBytes(ObRowKey key) {
88-
if (key == null || key.getObjs() == null || key.getObjs().isEmpty()) {
89-
return new byte[0];
90-
}
91-
92-
ObObj obObjKey = key.getObjs().get(0);
93-
if (obObjKey.getValue() instanceof byte[]) {
94-
return (byte[]) obObjKey.getValue();
95-
}
96-
97-
throw new IllegalArgumentException("The start key does not contain a byte[] value.");
94+
private byte[] parseStartKeyToBytes(List<ObObj> key) {
95+
ObObj obObjKey = key.get(0);
96+
return obObjKey.encode();
9897
}
9998

100-
private boolean isKeyInRange(ObNewRange range, ObRowKey key) {
101-
byte[] startKeyBytes = parseStartKeyToBytes(range.getStartKey());
102-
byte[] endKeyBytes = parseStartKeyToBytes(range.getEndKey());
99+
private boolean isKeyInRange(ObNewRange range, List<ObObj> key) {
100+
byte[] startKeyBytes = parseStartKeyToBytes(range.getStartKey().getObjs());
101+
byte[] endKeyBytes = parseStartKeyToBytes(range.getEndKey().getObjs());
103102
byte[] keyBytes = parseStartKeyToBytes(key);
104103

105104
int startComparison = compareByteArrays(startKeyBytes, keyBytes);
106105
int endComparison = compareByteArrays(endKeyBytes, keyBytes);
107-
108-
// 假设范围是 [start, end),那么需要:
109-
boolean withinStart = startComparison <= 0; // key >= start
110-
boolean withinEnd = endComparison > 0; // key < end
106+
107+
boolean withinStart = startComparison <= 0;
108+
boolean withinEnd = endComparison > 0;
111109

112110
return withinStart && withinEnd;
113111
}
114112

115-
private int compareByteArrays(byte[] array1, byte[] array2) {
116-
for (int i = 0; i < Math.min(array1.length, array2.length); i++) {
117-
int a = (array1[i] & 0xFF);
118-
int b = (array2[i] & 0xFF);
119-
if (a != b) {
120-
return a - b;
121-
}
122-
}
123-
return array1.length - array2.length;
124-
}
125-
126-
private byte[] incrementByteArray(byte[] input) {
127-
if (input == null || input.length == 0) {
128-
return new byte[]{1}; // 仅处理空输入情况
129-
}
130-
131-
byte[] result = input.clone(); // 复制数组以保持原数据不变
132-
133-
for (int i = result.length - 1; i >= 0; i--) {
134-
// 增加该字节1
135-
result[i] += 1;
136-
137-
// 检查是否出现溢出
138-
if ((result[i] & 0xFF) != 0) {
139-
// 没有溢出,直接返回结果,不必再加了
140-
return result;
141-
}
142-
143-
// 若溢出,需要下一个字节处理进位(当前字节已0)
144-
result[i] = 0;
145-
}
146-
147-
// 如果循环结束还没有返回,说明最高位有进位
148-
byte[] extendedResult = new byte[result.length + 1];
149-
extendedResult[0] = 1; // 设置第一个字节,代表进位
150-
151-
return extendedResult;
152-
}
153113

154114
/*
155115
* Check filter.

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,12 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
145145
}
146146

147147
@Override
148-
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception {
148+
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
149+
String tableName)
150+
throws Exception {
149151
return buildPartitions(client, tableQuery, tableName);
150152
}
151-
153+
152154
@Override
153155
public boolean next() throws Exception {
154156
checkStatus();
@@ -192,6 +194,7 @@ public boolean next() throws Exception {
192194
this.tableQuery.adjustStartKey(currentStartKey);
193195
setExpectant(refreshPartition(tableQuery, tableName));
194196
it = expectant.entrySet().iterator();
197+
continue;
195198
} else {
196199
throw e;
197200
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,12 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
7979
}
8080

8181
@Override
82-
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception {
82+
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
83+
String tableName)
84+
throws Exception {
8385
return buildPartitions(client, tableQuery, tableName);
8486
}
85-
87+
8688
/**
8789
* Get client.
8890
* @return client

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ public ObTableSingleOpResult[] executeInternal() throws Exception {
675675
long getTableTime = System.currentTimeMillis();
676676
final Map<Object, Object> context = ThreadLocalMap.getContextMap();
677677
final int maxRetries = obTableClient.getRuntimeRetryTimes();
678-
678+
679679
if (executorService != null && !executorService.isShutdown() && lsOperations.size() > 1) {
680680
// execute sub-batch operation in parallel
681681
final ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(executorService,
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2023 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.util;
19+
20+
public class ByteUtil {
21+
/**
22+
* Compares two byte arrays lexicographically.
23+
*
24+
* This method interprets each byte as an unsigned value between 0 and 255
25+
* and performs a comparison on the arrays byte-by-byte. If two corresponding
26+
* bytes differ, it returns the difference between the first non-equal byte
27+
* values (treated as unsigned). If one array is a prefix of the other,
28+
* the shorter array is considered to be less than the longer one.
29+
*
30+
* @param array1 the first byte array to be compared
31+
* @param array2 the second byte array to be compared
32+
* @return a negative integer if array1 comes before array2,
33+
* a positive integer if array1 comes after array2,
34+
* or zero if they are equal
35+
*/
36+
static public int compareByteArrays(byte[] array1, byte[] array2) {
37+
for (int i = 0; i < Math.min(array1.length, array2.length); i++) {
38+
int a = (array1[i] & 0xFF);
39+
int b = (array2[i] & 0xFF);
40+
if (a != b) {
41+
return a - b;
42+
}
43+
}
44+
return array1.length - array2.length;
45+
}
46+
47+
/**
48+
* Increments a byte array treated as an unsigned integer by one.
49+
*
50+
* This method treats the input byte array as a non-negative big-endian
51+
* integer, and increments its value by 1. The most significant byte is at
52+
* the beginning of the array. If the increment operation causes an overflow,
53+
* the resulting array will be extended to accommodate the carry.
54+
*
55+
* @param input the byte array representing a non-negative integer
56+
* @return a new byte array representing the incremented value
57+
*/
58+
static public byte[] incrementByteArray(byte[] input) {
59+
if (input == null || input.length == 0) {
60+
return new byte[] { 1 };
61+
}
62+
byte[] result = input.clone();
63+
for (int i = result.length - 1; i >= 0; i--) {
64+
result[i] += 1;
65+
66+
if ((result[i] & 0xFF) != 0) {
67+
return result;
68+
}
69+
result[i] = 0;
70+
}
71+
72+
byte[] extendedResult = new byte[result.length + 1];
73+
extendedResult[0] = 1;
74+
return extendedResult;
75+
}
76+
77+
/**
78+
* Decrements a byte array treated as an unsigned integer by one.
79+
*
80+
* This method treats the input byte array as a non-negative big-endian
81+
* integer, and decrements its value by 1. If the entire array represents
82+
* zero, the function should handle it appropriately, typically returning
83+
* an array representing zero.
84+
*
85+
* @param input the byte array representing a non-negative integer
86+
* @return a new byte array representing the decremented value
87+
* @throws IllegalArgumentException if the input represents zero
88+
*/
89+
static public byte[] decrementByteArray(byte[] input) {
90+
if (input == null || input.length == 0 || isZero(input)) {
91+
throw new IllegalArgumentException("Input array must represent a positive integer.");
92+
}
93+
94+
byte[] result = input.clone();
95+
96+
// Traverse the array from the least significant byte to the most
97+
for (int i = result.length - 1; i >= 0; i--) {
98+
if ((result[i] & 0xFF) > 0) {
99+
result[i] -= 1;
100+
return result;
101+
}
102+
103+
// If the byte is zero, we need to borrow from the next
104+
result[i] = (byte) 0xFF; // Set current byte to 255 after borrow
105+
}
106+
107+
// Handle cases where we subtract from a number like 1000 -> 999
108+
if (result[0] == 0) {
109+
byte[] shortenedResult = new byte[result.length - 1];
110+
System.arraycopy(result, 1, shortenedResult, 0, shortenedResult.length);
111+
return shortenedResult;
112+
}
113+
114+
return result;
115+
}
116+
117+
/**
118+
* Helper method to check if a byte array represents zero.
119+
*
120+
* @param array the byte array to check
121+
* @return true if the byte array is zero, false otherwise
122+
*/
123+
static private boolean isZero(byte[] array) {
124+
for (byte b : array) {
125+
if (b != 0) {
126+
return false;
127+
}
128+
}
129+
return true;
130+
}
131+
132+
}

0 commit comments

Comments
 (0)