Skip to content

Commit 7e587c0

Browse files
authored
Fix scan sess missing transfer and single cf table put (#375)
* generate new query request when hash_not_exist during query_next * fix isKeyInRange when encounter MinObObj or MaxObObj * fix hbase put sinlge cf table
1 parent 95cca74 commit 7e587c0

File tree

8 files changed

+206
-103
lines changed

8 files changed

+206
-103
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,10 @@ public void dealWithRpcTimeoutForBatchTablet(ObServerAddr addr, String tableName
11931193
*/
11941194
public String tryGetTableNameFromTableGroupCache(final String tableGroupName,
11951195
final boolean refresh) throws Exception {
1196-
return tableRoute.tryGetTableNameFromTableGroupCache(tableGroupName, refresh);
1196+
if (isTableGroupName(tableGroupName)) {
1197+
return tableRoute.tryGetTableNameFromTableGroupCache(tableGroupName, refresh);
1198+
}
1199+
return tableGroupName;
11971200
}
11981201

11991202
/**
@@ -2401,17 +2404,13 @@ public ObPayload execute(final ObHbaseRequest request) throws Exception {
24012404
// get the first cell from the first cfRows to route
24022405
// use the first table in tablegroup to route
24032406
String realTableName = null;
2404-
try {
2405-
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2406-
} catch (ObTableNotExistException e) {
2407-
if (request.getCfRows().size() != 1) {
2408-
throw new ObTableUnexpectedException("multi-cf operations must create tablegroup and binding tables");
2409-
} else {
2410-
realTableName = request.getCfRows().get(0).getRealTableName();
2411-
}
2407+
if (request.getCfRows().isEmpty()) {
2408+
throw new ObTableUnexpectedException("no cf rows");
24122409
}
2413-
if (realTableName == null) {
2414-
throw new ObTableUnexpectedException("realTableName is null");
2410+
if (request.getCfRows().size() > 1) {
2411+
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2412+
} else {
2413+
realTableName = request.getCfRows().get(0).getRealTableName();
24152414
}
24162415
int keyIdx = request.getCfRows().get(0).getKeyIndex(0);
24172416
row.add("K", request.getKeys().get(keyIdx).getValue());

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
123123
boolean isNeedRefreshMeta = false;
124124
ObRpcResultCode resultCode = new ObRpcResultCode();
125125
resultCode.decode(buf);
126-
logger.debug("require_rerouting_: {}, need_refresh_kv_meta_: {}"
126+
logger.debug("require_rerouting: {}, need_refresh_kv_meta: {}"
127127
, response.getHeader().isRoutingWrong(), response.getHeader().isNeedRefreshKvMeta());
128128
if (response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
129129
if (resultCode.getRcode() != 0) {

src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ public static ObTableException convertToObTableException(String host, int port,
8686
+ "]" + "[" + resultCodes.name() + "]" + "["
8787
+ errMsg + "]" + "[" + server + "]" + "["
8888
+ trace + "]", resultCodes.errorCode);
89+
} else if (resultCodes.errorCode == OB_KV_SESS_NOT_EXIST.errorCode) {
90+
return new ObTableSessionNotExistException("[" + String.valueOf(resultCodes.errorCode)
91+
+ "]" + "[" + resultCodes.name() + "]" + "["
92+
+ errMsg + "]" + "[" + server + "]" + "["
93+
+ trace + "]", resultCodes.errorCode);
8994
} else {
9095
// [errCode][errCodeName][errMsg][server][trace]
9196
return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.alipay.oceanbase.rpc.exception;
2+
3+
public class ObTableSessionNotExistException extends ObTableException {
4+
/*
5+
* Ob table query session not in server exception.
6+
*/
7+
public ObTableSessionNotExistException() {
8+
}
9+
10+
/*
11+
* Ob table query session not in server exception.
12+
*/
13+
public ObTableSessionNotExistException(int errorCode) {
14+
super(errorCode);
15+
}
16+
17+
/*
18+
* Ob table query session not in server exception.
19+
*/
20+
public ObTableSessionNotExistException(String message, int errorCode) {
21+
super(message, errorCode);
22+
}
23+
24+
/*
25+
* Ob table query session not in server exception.
26+
*/
27+
public ObTableSessionNotExistException(String message) {
28+
super(message);
29+
}
30+
31+
/*
32+
* Ob table query session not in server exception.
33+
*/
34+
public ObTableSessionNotExistException(String message, Throwable cause) {
35+
super(message, cause);
36+
}
37+
38+
/*
39+
* Ob table query session not in server exception.
40+
*/
41+
public ObTableSessionNotExistException(Throwable cause) {
42+
super(cause);
43+
}
44+
45+
/*
46+
* Is need refresh table entry.
47+
*/
48+
public boolean isNeedRefreshTableEntry() {
49+
return false;
50+
}
51+
}

src/main/java/com/alipay/oceanbase/rpc/location/model/TableGroupCache.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,11 @@ private String refreshTableNameByTableGroup(String physicalTableName, String tab
150150

151151
public void eraseTableGroupFromCache(String tableGroupName) {
152152
// clear table group cache
153-
TableGroupInverted.remove(TableGroupCache.get(tableGroupName));
153+
String tableName = TableGroupCache.get(tableGroupName);
154+
if (tableName != null) {
155+
// maybe in concurrent situation, other thread has erased name already
156+
TableGroupInverted.remove(tableName);
157+
}
154158
TableGroupCache.remove(tableGroupName);
155159
TableGroupCacheLocks.remove(tableGroupName);
156160
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ public enum ResultCodes {
749749
OB_KV_HBASE_NAMESPACE_NOT_FOUND(-10521), //
750750
OB_KV_HBASE_TABLE_EXISTS(-10522), //
751751
OB_KV_HBASE_TABLE_NOT_EXISTS(-10523), //
752-
OB_KV_HBASE_TABLE_NOT_DISABLED(-10524), //
752+
OB_KV_SESS_NOT_EXIST(-10524), //
753753
OB_KV_ODP_TIMEOUT(-10650), //
754754
OB_ERR_KV_ROUTE_ENTRY_EXPIRE(-10653);
755755

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,31 @@ private byte[] parseStartKeyToBytes(List<ObObj> key) {
108108
}
109109

110110
private boolean isKeyInRange(ObNewRange range, List<ObObj> key) {
111-
byte[] startKeyBytes = parseStartKeyToBytes(range.getStartKey().getObjs());
112-
byte[] endKeyBytes = parseStartKeyToBytes(range.getEndKey().getObjs());
113-
byte[] keyBytes = parseStartKeyToBytes(key);
111+
if (range.getStartKey().getObj(0).isMinObj() && range.getEndKey().getObj(0).isMaxObj()) {
112+
return true;
113+
} else if (range.getStartKey().getObj(0).isMinObj()) {
114+
byte[] keyBytes = parseStartKeyToBytes(key);
115+
byte[] endKeyBytes = parseStartKeyToBytes(range.getEndKey().getObjs());
116+
int endComparison = compareByteArrays(endKeyBytes, keyBytes);
117+
return endComparison > 0;
118+
} else if (range.getEndKey().getObj(0).isMaxObj()) {
119+
byte[] keyBytes = parseStartKeyToBytes(key);
120+
byte[] startKeyBytes = parseStartKeyToBytes(range.getStartKey().getObjs());
121+
int startComparison = compareByteArrays(startKeyBytes, keyBytes);
122+
return startComparison <= 0;
123+
} else {
124+
byte[] startKeyBytes = parseStartKeyToBytes(range.getStartKey().getObjs());
125+
byte[] endKeyBytes = parseStartKeyToBytes(range.getEndKey().getObjs());
126+
byte[] keyBytes = parseStartKeyToBytes(key);
114127

115-
int startComparison = compareByteArrays(startKeyBytes, keyBytes);
116-
int endComparison = compareByteArrays(endKeyBytes, keyBytes);
128+
int startComparison = compareByteArrays(startKeyBytes, keyBytes);
129+
int endComparison = compareByteArrays(endKeyBytes, keyBytes);
117130

118-
boolean withinStart = startComparison <= 0;
119-
boolean withinEnd = endComparison > 0;
131+
boolean withinStart = startComparison <= 0;
132+
boolean withinEnd = endComparison > 0;
120133

121-
return withinStart && withinEnd;
134+
return withinStart && withinEnd;
135+
}
122136
}
123137

124138

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 111 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alipay.oceanbase.rpc.exception.*;
2222
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2323
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
24+
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
2425
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2526
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
@@ -176,6 +177,103 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
176177
}
177178
}
178179

180+
public boolean queryLastStreamResultInNext() throws Exception {
181+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
182+
.iterator();
183+
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
184+
try {
185+
// try access new partition, async will not remove useless expectant
186+
referToLastStreamResult(lastEntry.getValue());
187+
} catch (Exception e) {
188+
if (shouldRetry(e)) {
189+
String realTableName = client.getPhyTableNameFromTableGroup(entityType,
190+
tableName);
191+
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false);
192+
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
193+
if (entry.isPartitionTable()
194+
&& entry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ONE
195+
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
196+
.isRangePart()) {
197+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
198+
.adjustStartKey(currentStartKey);
199+
setExpectant(refreshPartition(this.asyncRequest
200+
.getObTableQueryRequest().getTableQuery(), realTableName));
201+
setEnd(true);
202+
} else {
203+
// non one-level-range partitioned table does not retry, inform user to rescan
204+
throw e;
205+
}
206+
} else {
207+
throw e;
208+
}
209+
}
210+
// remove useless expectant if it is end
211+
if (isEnd()) {
212+
it.remove();
213+
}
214+
if (!cacheRows.isEmpty()) {
215+
nextRow();
216+
return true;
217+
}
218+
return false;
219+
}
220+
221+
public boolean queryNewStreamResultInNext() throws Exception {
222+
boolean hasNext = false;
223+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
224+
.iterator();
225+
int retryTimes = 0;
226+
long startExecute = System.currentTimeMillis();
227+
while (it.hasNext()) {
228+
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
229+
try {
230+
// try access new partition, async will not remove useless expectant
231+
referToNewPartition(entry.getValue());
232+
} catch (Exception e) {
233+
if (shouldRetry(e)) {
234+
String realTableName = client.getPhyTableNameFromTableGroup(entityType,
235+
tableName);
236+
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false);
237+
if (tableEntry.isPartitionTable()
238+
&& tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ONE
239+
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
240+
.isRangePart()) {
241+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
242+
.adjustStartKey(currentStartKey);
243+
setExpectant(refreshPartition(this.asyncRequest
244+
.getObTableQueryRequest().getTableQuery(), realTableName));
245+
} else {
246+
// non one-level-range partitioned table does not retry, inform user to rescan
247+
throw e;
248+
}
249+
it = expectant.entrySet().iterator();
250+
retryTimes++;
251+
long costMillis = System.currentTimeMillis() - startExecute;
252+
if (costMillis > client.getRuntimeMaxWait()) {
253+
RUNTIME.error("Fail to get refresh table entry response after {}",
254+
retryTimes);
255+
throw new ObTableTimeoutExcetion(
256+
"Fail to get refresh table entry response after " + retryTimes);
257+
}
258+
continue;
259+
} else {
260+
throw e;
261+
}
262+
}
263+
264+
// remove useless expectant if it is end
265+
if (isEnd()) {
266+
it.remove();
267+
}
268+
if (!cacheRows.isEmpty()) {
269+
hasNext = true;
270+
nextRow();
271+
break;
272+
}
273+
}
274+
return hasNext;
275+
}
276+
179277
@Override
180278
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
181279
String tableName)
@@ -222,94 +320,26 @@ public boolean next() throws Exception {
222320
return true;
223321
}
224322

323+
boolean hasNext = false;
225324
// secondly, refer to the last stream result
226325
if (!isEnd() && !expectant.isEmpty()) {
227-
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
228-
.iterator();
229-
230-
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
231326
try {
232-
// try access new partition, async will not remove useless expectant
233-
referToLastStreamResult(lastEntry.getValue());
234-
} catch (Exception e) {
235-
if (shouldRetry(e)) {
236-
String realTableName = client.getPhyTableNameFromTableGroup(entityType,
237-
tableName);
238-
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false);
239-
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
240-
if (entry.isPartitionTable()
241-
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
242-
.isRangePart()) {
243-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
244-
.adjustStartKey(currentStartKey);
245-
setExpectant(refreshPartition(this.asyncRequest
246-
.getObTableQueryRequest().getTableQuery(), realTableName));
247-
setEnd(true);
248-
}
249-
} else {
250-
throw e;
251-
}
252-
}
253-
// remove useless expectant if it is end
254-
if (isEnd())
255-
it.remove();
256-
257-
if (!cacheRows.isEmpty()) {
258-
nextRow();
259-
return true;
327+
hasNext = queryLastStreamResultInNext();
328+
} catch (ObTableSessionNotExistException e) {
329+
// session_id missing because the tablet has been transferred to new server
330+
// new server does not store the current session_id
331+
// only support range-partitioned table, check in server
332+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
333+
.adjustStartKey(currentStartKey);
334+
// just need to asjust startKey to anchor the correct position
335+
// no need to refresh partition id for session_id missing
336+
hasNext = queryNewStreamResultInNext();
260337
}
261338
}
262-
263339
// lastly, refer to the new partition
264-
boolean hasNext = false;
265-
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
266-
.iterator();
267-
int retryTimes = 0;
268-
long startExecute = System.currentTimeMillis();
269-
while (it.hasNext()) {
270-
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
271-
try {
272-
// try access new partition, async will not remove useless expectant
273-
referToNewPartition(entry.getValue());
274-
} catch (Exception e) {
275-
if (shouldRetry(e)) {
276-
String realTableName = client.getPhyTableNameFromTableGroup(entityType,
277-
tableName);
278-
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false);
279-
if (tableEntry.isPartitionTable()
280-
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
281-
.isRangePart()) {
282-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
283-
.adjustStartKey(currentStartKey);
284-
setExpectant(refreshPartition(this.asyncRequest
285-
.getObTableQueryRequest().getTableQuery(), realTableName));
286-
}
287-
it = expectant.entrySet().iterator();
288-
retryTimes++;
289-
long costMillis = System.currentTimeMillis() - startExecute;
290-
if (costMillis > client.getRuntimeMaxWait()) {
291-
RUNTIME.error("Fail to get refresh table entry response after {}",
292-
retryTimes);
293-
throw new ObTableTimeoutExcetion(
294-
"Fail to get refresh table entry response after " + retryTimes);
295-
}
296-
continue;
297-
} else {
298-
throw e;
299-
}
300-
}
301-
302-
// remove useless expectant if it is end
303-
if (isEnd())
304-
it.remove();
305-
306-
if (!cacheRows.isEmpty()) {
307-
hasNext = true;
308-
nextRow();
309-
break;
310-
}
340+
if (!hasNext) {
341+
hasNext = queryNewStreamResultInNext();
311342
}
312-
313343
return hasNext;
314344
} finally {
315345
lock.unlock();

0 commit comments

Comments
 (0)