Skip to content

Commit e2649e0

Browse files
committed
query refresh partitionTables
1 parent de8f71f commit e2649e0

File tree

5 files changed

+235
-32
lines changed

5 files changed

+235
-32
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
3030
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
3131
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
32+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
3233
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
3334
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest;
3435
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult;
@@ -50,6 +51,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
5051
protected volatile boolean closed = false;
5152
protected volatile List<ObObj> row = null;
5253
protected volatile int rowIndex = -1;
54+
// 调整它的startKey
5355
protected ObTableQuery tableQuery;
5456
protected long operationTimeout = -1;
5557
protected String tableName;
@@ -63,6 +65,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
6365
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
6466
private LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>> partitionLastResult = new LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>>();
6567
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
68+
public ObRowKey currentStartKey;
6669

6770
/*
6871
* Get pcode.
@@ -243,6 +246,11 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
243246
"tablename:{} partition id:{} stream query retry while meet Exception needing refresh, errorCode: {} , retry times {}",
244247
indexTableName, partIdWithIndex.getLeft(),
245248
((ObTableException) e).getErrorCode(), tryTimes, e);
249+
// tablet not exists, refresh table entry
250+
if (e instanceof ObTableNeedFetchAllException) {
251+
client.getOrRefreshTableEntry(tableName, true, true, true);
252+
throw e;
253+
}
246254
} else {
247255
client.calculateContinuousFailure(indexTableName, e.getMessage());
248256
throw e;
@@ -265,12 +273,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
265273
/*
266274
* Next.
267275
*/
276+
// 这个函数中,任何的计算结果都会填充到cacheRows
268277
public boolean next() throws Exception {
269278
checkStatus();
270279
lock.lock();
271280
try {
272281
// firstly, refer to the cache
273-
if (cacheRows.size() > 0) {
282+
if (!cacheRows.isEmpty()) {
274283
nextRow();
275284
return true;
276285
}
@@ -293,6 +302,40 @@ public boolean next() throws Exception {
293302
// lastly, refer to the new partition
294303
boolean hasNext = false;
295304
List<Map.Entry<Long, ObPair<Long, ObTableParam>>> referPartition = new ArrayList<Map.Entry<Long, ObPair<Long, ObTableParam>>>();
305+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet().iterator();
306+
while (it.hasNext()) {
307+
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
308+
referPartition.add(entry);
309+
try {
310+
// Mark the refer partition
311+
referPartition.add(entry);
312+
313+
// Try accessing the new partition
314+
ObTableQueryResult tableQueryResult = (ObTableQueryResult) referToNewPartition(entry.getValue());
315+
316+
if (tableQueryResult.getRowCount() == 0) {
317+
continue;
318+
}
319+
320+
hasNext = true;
321+
nextRow();
322+
break;
323+
324+
} catch (Exception e) {
325+
if (e instanceof ObTableNeedFetchAllException) {
326+
// Adjust the start key and refresh the expectant
327+
this.tableQuery.adjustStartKey(currentStartKey);
328+
setExpectant(refreshPartition(tableQuery, tableName));
329+
330+
// Reset the iterator to start over
331+
it = expectant.entrySet().iterator();
332+
referPartition.clear(); // Clear the referPartition if needed
333+
} else {
334+
throw e;
335+
}
336+
}
337+
}
338+
296339
for (Map.Entry<Long, ObPair<Long, ObTableParam>> entry : expectant.entrySet()) {
297340
// mark the refer partition
298341
referPartition.add(entry);
@@ -317,9 +360,59 @@ public boolean next() throws Exception {
317360
}
318361
}
319362

363+
protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
364+
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new HashMap<>();
365+
String indexName = tableQuery.getIndexName();
366+
String indexTableName = null;
367+
368+
if (!client.isOdpMode()) {
369+
indexTableName = client.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns(), false);
370+
}
371+
372+
for (ObNewRange range : tableQuery.getKeyRanges()) {
373+
ObRowKey startKey = range.getStartKey();
374+
int startKeySize = startKey.getObjs().size();
375+
ObRowKey endKey = range.getEndKey();
376+
int endKeySize = endKey.getObjs().size();
377+
Object[] start = new Object[startKeySize];
378+
Object[] end = new Object[endKeySize];
379+
380+
for (int i = 0; i < startKeySize; i++) {
381+
start[i] = startKey.getObj(i).isMinObj() || startKey.getObj(i).isMaxObj() ?
382+
startKey.getObj(i) : startKey.getObj(i).getValue();
383+
}
384+
385+
for (int i = 0; i < endKeySize; i++) {
386+
end[i] = endKey.getObj(i).isMinObj() || endKey.getObj(i).isMaxObj() ?
387+
endKey.getObj(i) : endKey.getObj(i).getValue();
388+
}
389+
390+
ObBorderFlag borderFlag = range.getBorderFlag();
391+
List<ObPair<Long, ObTableParam>> pairs = client.getTables(indexTableName,
392+
tableQuery, start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
393+
false, false);
394+
395+
if (tableQuery.getScanOrder() == ObScanOrder.Reverse) {
396+
for (int i = pairs.size() - 1; i >= 0; i--) {
397+
partitionObTables.put(pairs.get(i).getLeft(), pairs.get(i));
398+
}
399+
} else {
400+
for (ObPair<Long, ObTableParam> pair : pairs) {
401+
partitionObTables.put(pair.getLeft(), pair);
402+
}
403+
}
404+
}
405+
406+
return partitionObTables;
407+
}
408+
409+
// 上层会不断的调getRow, 也就是不断的取出缓存,所以可以在这里给row赋值的时候顺便把这个row记录下来,用来作为最后拿到的key
320410
protected void nextRow() {
321411
rowIndex = rowIndex + 1;
322412
row = cacheRows.poll();
413+
if (row != null) {
414+
currentStartKey = (ObRowKey) row.get(0).getValue();
415+
}
323416
}
324417

325418
protected void checkStatus() throws IllegalStateException {
@@ -397,6 +490,7 @@ protected abstract ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdW
397490
protected abstract ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partIdWithObTable,
398491
ObPayload streamRequest)
399492
throws Exception;
493+
protected abstract Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception;
400494

401495
protected void cacheResultRows(ObTableQueryResult tableQueryResult) {
402496
cacheRows.addAll(tableQueryResult.getPropertiesRows());

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
2222
import com.alipay.oceanbase.rpc.table.ObKVParams;
2323
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
24+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
2426
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
2527
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
2628
import com.alipay.oceanbase.rpc.util.Serialization;
@@ -69,6 +71,86 @@ public class ObTableQuery extends AbstractPayload {
6971

7072
private ObKVParams obKVParams;
7173

74+
public void adjustStartKey(ObRowKey key) throws IllegalArgumentException {
75+
List<ObNewRange> keyRanges = getKeyRanges();
76+
for (ObNewRange range : keyRanges) {
77+
if (isKeyInRange(range, key)) {
78+
byte[] bytes = parseStartKeyToBytes(key);
79+
ObRowKey newStartKey = ObRowKey.getInstance(new Object[]{incrementByteArray(bytes), ObObj.getMin(), ObObj.getMin()});
80+
range.setStartKey(newStartKey);
81+
return;
82+
}
83+
}
84+
throw new IllegalArgumentException("Key not found in any KeyRange.");
85+
}
86+
87+
private byte[] parseStartKeyToBytes(ObRowKey key) {
88+
if (key == null || key.getObjs() == null || key.getObjs().isEmpty()) {
89+
return new byte[0];
90+
}
91+
92+
ObObj obObjKey = key.getObjs().get(0);
93+
if (obObjKey.getValue() instanceof byte[]) {
94+
return (byte[]) obObjKey.getValue();
95+
}
96+
97+
throw new IllegalArgumentException("The start key does not contain a byte[] value.");
98+
}
99+
100+
private boolean isKeyInRange(ObNewRange range, ObRowKey key) {
101+
byte[] startKeyBytes = parseStartKeyToBytes(range.getStartKey());
102+
byte[] endKeyBytes = parseStartKeyToBytes(range.getEndKey());
103+
byte[] keyBytes = parseStartKeyToBytes(key);
104+
105+
int startComparison = compareByteArrays(startKeyBytes, keyBytes);
106+
int endComparison = compareByteArrays(endKeyBytes, keyBytes);
107+
108+
// 假设范围是 [start, end),那么需要:
109+
boolean withinStart = startComparison <= 0; // key >= start
110+
boolean withinEnd = endComparison > 0; // key < end
111+
112+
return withinStart && withinEnd;
113+
}
114+
115+
private int compareByteArrays(byte[] array1, byte[] array2) {
116+
for (int i = 0; i < Math.min(array1.length, array2.length); i++) {
117+
int a = (array1[i] & 0xFF);
118+
int b = (array2[i] & 0xFF);
119+
if (a != b) {
120+
return a - b;
121+
}
122+
}
123+
return array1.length - array2.length;
124+
}
125+
126+
private byte[] incrementByteArray(byte[] input) {
127+
if (input == null || input.length == 0) {
128+
return new byte[]{1}; // 仅处理空输入情况
129+
}
130+
131+
byte[] result = input.clone(); // 复制数组以保持原数据不变
132+
133+
for (int i = result.length - 1; i >= 0; i--) {
134+
// 增加该字节1
135+
result[i] += 1;
136+
137+
// 检查是否出现溢出
138+
if ((result[i] & 0xFF) != 0) {
139+
// 没有溢出,直接返回结果,不必再加了
140+
return result;
141+
}
142+
143+
// 若溢出,需要下一个字节处理进位(当前字节已0)
144+
result[i] = 0;
145+
}
146+
147+
// 如果循环结束还没有返回,说明最高位有进位
148+
byte[] extendedResult = new byte[result.length + 1];
149+
extendedResult[0] = 1; // 设置第一个字节,代表进位
150+
151+
return extendedResult;
152+
}
153+
72154
/*
73155
* Check filter.
74156
*/

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@
2020
import com.alipay.oceanbase.rpc.ObTableClient;
2121
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2222
import com.alipay.oceanbase.rpc.exception.ObTableException;
23+
import com.alipay.oceanbase.rpc.exception.ObTableNeedFetchAllException;
2324
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2425
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2526
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
26-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
27-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest;
28-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryResult;
27+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
2928
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObQueryOperationType;
3029
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3130
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
@@ -145,6 +144,11 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
145144
}
146145
}
147146

147+
@Override
148+
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception {
149+
return buildPartitions(client, tableQuery, tableName);
150+
}
151+
148152
@Override
149153
public boolean next() throws Exception {
150154
checkStatus();
@@ -180,8 +184,18 @@ public boolean next() throws Exception {
180184
.iterator();
181185
while (it.hasNext()) {
182186
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
183-
// try access new partition, async will not remove useless expectant
184-
referToNewPartition(entry.getValue());
187+
try {
188+
// try access new partition, async will not remove useless expectant
189+
referToNewPartition(entry.getValue());
190+
} catch (Exception e) {
191+
if (e instanceof ObTableNeedFetchAllException) {
192+
this.tableQuery.adjustStartKey(currentStartKey);
193+
setExpectant(refreshPartition(tableQuery, tableName));
194+
it = expectant.entrySet().iterator();
195+
} else {
196+
throw e;
197+
}
198+
}
185199

186200
// remove useless expectant if it is end
187201
if (isEnd())

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2323
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
2526
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryResult;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
2829
import com.alipay.oceanbase.rpc.table.ObTableParam;
2930
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
3031
import org.slf4j.Logger;
3132

33+
import java.util.Map;
3234
import java.util.concurrent.atomic.AtomicReference;
3335

3436
public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult {
@@ -76,6 +78,11 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
7678
throw new IllegalArgumentException("not support this execute");
7779
}
7880

81+
@Override
82+
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception {
83+
return buildPartitions(client, tableQuery, tableName);
84+
}
85+
7986
/**
8087
* Get client.
8188
* @return client

0 commit comments

Comments
 (0)