Skip to content

Commit 8e0f120

Browse files
committed
fix scan_to_next_partition_test route error; fix scan_during_split_test result error
1 parent c48dadb commit 8e0f120

File tree

3 files changed

+25
-8
lines changed

3 files changed

+25
-8
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
6565
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
6666
private LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>> partitionLastResult = new LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>>();
6767
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
68+
// ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT]
6869
public List<ObObj> currentStartKey;
6970

7071
/*

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,18 @@ public void adjustStartKey(List<ObObj> key) throws IllegalArgumentException {
7777
List<ObNewRange> keyRanges = getKeyRanges();
7878
for (ObNewRange range : keyRanges) {
7979
if (key != null && isKeyInRange(range, key)) {
80-
byte[] bytes = parseStartKeyToBytes(key);
8180
ObRowKey newStartKey;
8281
if (getScanOrder() == ObScanOrder.Forward) {
83-
newStartKey = ObRowKey.getInstance(new Object[]{incrementByteArray(bytes), ObObj.getMin(), ObObj.getMin()});
82+
// get the real rowkey
83+
newStartKey = ObRowKey.getInstance(new Object[]{key.get(0).getValue(), ObObj.getMax(), ObObj.getMax()});
8484
} else {
85-
newStartKey = ObRowKey.getInstance(new Object[]{decrementByteArray(bytes), ObObj.getMax(), ObObj.getMax()});
85+
newStartKey = ObRowKey.getInstance(new Object[]{key.get(0).getValue(), ObObj.getMax(), ObObj.getMax()});
8686
}
8787
range.setStartKey(newStartKey);
8888
return;
8989
}
9090
}
91+
System.out.println("noting changed");
9192
/* keyRanges not changed */
9293
}
9394

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3030
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
3131
import com.alipay.oceanbase.rpc.table.ObTableParam;
32+
import com.alipay.oceanbase.rpc.util.ByteUtil;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

@@ -166,10 +167,22 @@ public boolean next() throws Exception {
166167
if (!isEnd() && !expectant.isEmpty()) {
167168
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
168169
.iterator();
169-
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
170-
// try access new partition, async will not remove useless expectant
171-
referToLastStreamResult(lastEntry.getValue());
172170

171+
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
172+
try {
173+
// try access new partition, async will not remove useless expectant
174+
referToLastStreamResult(lastEntry.getValue());
175+
} catch (Exception e) {
176+
if (e instanceof ObTableNeedFetchAllException) {
177+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
178+
.adjustStartKey(currentStartKey);
179+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
180+
.getTableQuery(), tableName));
181+
setEnd(true);
182+
} else {
183+
throw e;
184+
}
185+
}
173186
// remove useless expectant if it is end
174187
if (isEnd())
175188
it.remove();
@@ -191,8 +204,10 @@ public boolean next() throws Exception {
191204
referToNewPartition(entry.getValue());
192205
} catch (Exception e) {
193206
if (e instanceof ObTableNeedFetchAllException) {
194-
this.tableQuery.adjustStartKey(currentStartKey);
195-
setExpectant(refreshPartition(tableQuery, tableName));
207+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
208+
.adjustStartKey(currentStartKey);
209+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
210+
.getTableQuery(), tableName));
196211
it = expectant.entrySet().iterator();
197212
continue;
198213
} else {

0 commit comments

Comments
 (0)