Skip to content

Commit 259b344

Browse files
authored
fix tablet_id error bug (#302)
* fix sub expr bug * fix scan bug
1 parent b7e7fec commit 259b344

File tree

1 file changed

+38
-24
lines changed

1 file changed

+38
-24
lines changed

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,17 @@ public void init() throws Exception {
8181
} catch (Exception e) {
8282
if (e instanceof ObTableNeedFetchAllException) {
8383
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
84-
.getTableQuery(), client.getPhyTableNameFromTableGroup(entityType, tableName)));
84+
.getTableQuery(), client.getPhyTableNameFromTableGroup(entityType,
85+
tableName)));
8586
it = expectant.entrySet().iterator();
8687
retryTimes++;
8788
if (retryTimes > maxRetries) {
8889
RUNTIME.error("Fail to get refresh table entry response after {}",
8990
retryTimes);
9091
throw new ObTableRetryExhaustedException(
9192
"Fail to get refresh table entry response after " + retryTimes
92-
+ "errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode());
93+
+ "errorCode:"
94+
+ ((ObTableNeedFetchAllException) e).getErrorCode());
9395

9496
}
9597
} else {
@@ -113,8 +115,8 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
113115
throws Exception {
114116
ObTableParam obTableParam = partIdWithObTable.getRight();
115117
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
116-
long partitionId = client.getServerCapacity().isSupportSecondaryPartition() ?
117-
INVALID_TABLET_ID : obTableParam.getPartitionId();
118+
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
119+
: obTableParam.getPartitionId();
118120
// refresh request info
119121
queryRequest.setPartitionId(partitionId);
120122
queryRequest.setTableId(obTableParam.getTableId());
@@ -141,7 +143,9 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair<Long, ObTablePa
141143
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
142144

143145
// refresh request info
144-
queryRequest.setPartitionId(obTableParam.getPartitionId());
146+
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
147+
: obTableParam.getPartitionId();
148+
queryRequest.setPartitionId(partitionId);
145149
queryRequest.setTableId(obTableParam.getTableId());
146150

147151
// refresh async query request
@@ -160,7 +164,9 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
160164
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
161165

162166
// refresh request info
163-
queryRequest.setPartitionId(obTableParam.getPartitionId());
167+
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
168+
: obTableParam.getPartitionId();
169+
queryRequest.setPartitionId(partitionId);
164170
queryRequest.setTableId(obTableParam.getTableId());
165171

166172
// set end async query
@@ -188,15 +194,17 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
188194
public void renewLease() throws Exception {
189195
if (!isEnd() && !expectant.isEmpty()) {
190196
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
191-
.iterator();
197+
.iterator();
192198
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
193199
ObPair<Long, ObTableParam> partIdWithObTable = lastEntry.getValue();
194200
// try access new partition, async will not remove useless expectant
195201
ObTableParam obTableParam = partIdWithObTable.getRight();
196202
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
197203

198204
// refresh request info
199-
queryRequest.setPartitionId(obTableParam.getPartitionId());
205+
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
206+
: obTableParam.getPartitionId();
207+
queryRequest.setPartitionId(partitionId);
200208
queryRequest.setTableId(obTableParam.getTableId());
201209

202210
// refresh async query request
@@ -230,20 +238,23 @@ public boolean next() throws Exception {
230238
referToLastStreamResult(lastEntry.getValue());
231239
} catch (Exception e) {
232240
if (e instanceof ObTableNeedFetchAllException) {
233-
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
234-
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false, false, false);
241+
String realTableName = client.getPhyTableNameFromTableGroup(entityType,
242+
tableName);
243+
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false,
244+
false, false);
235245
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
236246
if (ObGlobal.obVsnMajor() >= 4
237-
&& entry.isPartitionTable()
238-
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
247+
&& entry.isPartitionTable()
248+
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
249+
.isRangePart()) {
239250
this.asyncRequest.getObTableQueryRequest().getTableQuery()
240-
.adjustStartKey(currentStartKey);
241-
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
242-
.getTableQuery(), realTableName));
251+
.adjustStartKey(currentStartKey);
252+
setExpectant(refreshPartition(this.asyncRequest
253+
.getObTableQueryRequest().getTableQuery(), realTableName));
243254
setEnd(true);
244255
} else {
245-
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
246-
.getTableQuery(), realTableName));
256+
setExpectant(refreshPartition(this.asyncRequest
257+
.getObTableQueryRequest().getTableQuery(), realTableName));
247258
}
248259
} else {
249260
throw e;
@@ -271,15 +282,18 @@ public boolean next() throws Exception {
271282
referToNewPartition(entry.getValue());
272283
} catch (Exception e) {
273284
if (e instanceof ObTableNeedFetchAllException) {
274-
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
275-
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false, false, false);
285+
String realTableName = client.getPhyTableNameFromTableGroup(entityType,
286+
tableName);
287+
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false,
288+
false, false);
276289
if (ObGlobal.obVsnMajor() >= 4
277290
&& tableEntry.isPartitionTable()
278-
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
291+
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
292+
.isRangePart()) {
279293
this.asyncRequest.getObTableQueryRequest().getTableQuery()
280-
.adjustStartKey(currentStartKey);
281-
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
282-
.getTableQuery(), realTableName));
294+
.adjustStartKey(currentStartKey);
295+
setExpectant(refreshPartition(this.asyncRequest
296+
.getObTableQueryRequest().getTableQuery(), realTableName));
283297
}
284298
it = expectant.entrySet().iterator();
285299
retryTimes++;
@@ -366,7 +380,7 @@ public void close() throws Exception {
366380
closeLastStreamResult(lastEntry.getValue());
367381
}
368382
}
369-
383+
370384
public boolean isEnd() {
371385
return isEnd;
372386
}

0 commit comments

Comments
 (0)