Skip to content

Commit 2056891

Browse files
miyuan-ljrstuBirdFly
authored andcommitted
feature: hbase compatible 2.x (#242)
* support hbase scan renewLease (#211) * hbase tablename bug fix (#237) * hbase empty family scan error (#239) * hbase tablename bug fix * hbase empty family scan error --------- Co-authored-by: stuBirdFly <[email protected]>
1 parent 78ada50 commit 2056891

File tree

5 files changed

+38
-13
lines changed

5 files changed

+38
-13
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3007,16 +3007,13 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
30073007
} else if (request instanceof ObTableQueryRequest) {
30083008
// TableGroup -> TableName
30093009
String tableName = request.getTableName();
3010-
tableName = getPhyTableNameFromTableGroup(((ObTableQueryRequest) request), tableName);
30113010
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
30123011
((ObTableQueryRequest) request).getTableQuery(), this);
30133012
tableQuery.setEntityType(request.getEntityType());
30143013
return new ObClusterTableQuery(tableQuery).executeInternal();
30153014
} else if (request instanceof ObTableQueryAsyncRequest) {
30163015
// TableGroup -> TableName
30173016
String tableName = request.getTableName();
3018-
tableName = getPhyTableNameFromTableGroup(
3019-
((ObTableQueryAsyncRequest) request).getObTableQueryRequest(), tableName);
30203017
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
30213018
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
30223019
tableQuery.setEntityType(request.getEntityType());

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
274274
return result;
275275
}
276276

277+
/*
278+
* RenewLease.
279+
*/
280+
public void renewLease() throws Exception {
281+
throw new IllegalStateException("renew only support stream query");
282+
}
283+
277284
/*
278285
* Next.
279286
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Map;
2222

2323
public enum ObQueryOperationType {
24-
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2);
24+
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3);
2525

2626
private int value;
2727
private static Map<Integer, ObQueryOperationType> map = new HashMap<Integer, ObQueryOperationType>();

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,32 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
182182
return buildPartitions(client, tableQuery, tableName);
183183
}
184184

185+
// This function is designed for HBase-type requests.
186+
// It is used to extend the session duration of a scan
187+
@Override
188+
public void renewLease() throws Exception {
189+
if (!isEnd() && !expectant.isEmpty()) {
190+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
191+
.iterator();
192+
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
193+
ObPair<Long, ObTableParam> partIdWithObTable = lastEntry.getValue();
194+
// try access new partition, async will not remove useless expectant
195+
ObTableParam obTableParam = partIdWithObTable.getRight();
196+
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
197+
198+
// refresh request info
199+
queryRequest.setPartitionId(obTableParam.getPartitionId());
200+
queryRequest.setTableId(obTableParam.getTableId());
201+
202+
// refresh async query request
203+
asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW);
204+
asyncRequest.setQuerySessionId(sessionId);
205+
executeAsync(partIdWithObTable, asyncRequest);
206+
} else {
207+
throw new ObTableException("query end or expectant is null");
208+
}
209+
}
210+
185211
@Override
186212
public boolean next() throws Exception {
187213
checkStatus();

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2525
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
27+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
2829
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
2930
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
@@ -192,14 +193,6 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback<Abstract
192193
}
193194
}
194195

195-
// set correct table group name for hbase
196-
if (tableQuery.isHbaseQuery()
197-
&& obTableClient.getTableGroupInverted().containsKey(tableName)
198-
&& tableName.equalsIgnoreCase(obTableClient.getTableGroupCache().get(
199-
obTableClient.getTableGroupInverted().get(tableName)))) {
200-
tableName = obTableClient.getTableGroupInverted().get(tableName);
201-
}
202-
203196
// init query stream result
204197
AbstractQueryStreamResult streamResult = callable.execute();
205198

@@ -267,7 +260,9 @@ public Map<Long, ObPair<Long, ObTableParam>> initPartitions(ObTableQuery tableQu
267260
end[i] = endKey.getObj(i).isMinObj() || endKey.getObj(i).isMaxObj() ?
268261
endKey.getObj(i) : endKey.getObj(i).getValue();
269262
}
270-
263+
if (this.entityType == ObTableEntityType.HKV && obTableClient.isTableGroupName(tableName)) {
264+
indexTableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, false);
265+
}
271266
ObBorderFlag borderFlag = range.getBorderFlag();
272267
List<ObPair<Long, ObTableParam>> pairs = this.obTableClient.getTables(indexTableName,
273268
tableQuery, start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),

0 commit comments

Comments
 (0)