Skip to content

Commit 05716ab

Browse files
authored
[Feat] support prefix scan in direct mode (#151)
1 parent c879b38 commit 05716ab

16 files changed

+548
-199
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
700700
tableEntryRefreshIntervalWait, route);
701701
} else if (null != callback.getKeyRanges()) {
702702
// using scan range
703-
obPair = getTable(tableName, callback.getKeyRanges(),
703+
obPair = getTable(tableName, new ObTableQuery(), callback.getKeyRanges(),
704704
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
705705
} else {
706706
throw new ObTableException("rowkey and scan range are null in mutation");
@@ -1359,18 +1359,18 @@ private long getPartition(TableEntry tableEntry, Object[] rowKey) {
13591359
/*
13601360
* Get logicId(partition id in 3.x) from giving range
13611361
*/
1362-
private List<Long> getPartitionsForLevelTwo(TableEntry tableEntry, Object[] start,
1363-
boolean startIncluded, Object[] end,
1364-
boolean endIncluded) throws Exception {
1362+
private List<Long> getPartitionsForLevelTwo(TableEntry tableEntry, List<String> scanRangeColumns,
1363+
Object[] start, boolean startIncluded,
1364+
Object[] end, boolean endIncluded) throws Exception {
13651365
if (tableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_TWO) {
13661366
RUNTIME.error("getPartitionsForLevelTwo need ObPartitionLevel LEVEL_TWO");
13671367
throw new Exception("getPartitionsForLevelTwo need ObPartitionLevel LEVEL_TWO");
13681368
}
13691369

13701370
List<Long> partIds1 = tableEntry.getPartitionInfo().getFirstPartDesc()
1371-
.getPartIds(start, startIncluded, end, endIncluded);
1371+
.getPartIds(scanRangeColumns, start, startIncluded, end, endIncluded);
13721372
List<Long> partIds2 = tableEntry.getPartitionInfo().getSubPartDesc()
1373-
.getPartIds(start, startIncluded, end, endIncluded);
1373+
.getPartIds(scanRangeColumns, start, startIncluded, end, endIncluded);
13741374

13751375
List<Long> partIds = new ArrayList<Long>();
13761376
if (partIds1.isEmpty()) {
@@ -1478,7 +1478,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
14781478
* @return ObPair of partId and table
14791479
* @throws Exception exception
14801480
*/
1481-
public ObPair<Long, ObTableParam> getTable(String tableName, List<ObNewRange> keyRanges, boolean refresh,
1481+
public ObPair<Long, ObTableParam> getTable(String tableName, ObTableQuery query, List<ObNewRange> keyRanges, boolean refresh,
14821482
boolean waitForRefresh, ObServerRoute route)
14831483
throws Exception {
14841484
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
@@ -1497,7 +1497,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, List<ObNewRange> ke
14971497
end[i] = endKey.getObj(i).getValue();
14981498
}
14991499
ObBorderFlag borderFlag = rang.getBorderFlag();
1500-
List<ObPair<Long, ObTableParam>> pairList = getTables(tableName, start,
1500+
List<ObPair<Long, ObTableParam>> pairList = getTables(tableName, query, start,
15011501
borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), false,
15021502
false);
15031503
for (ObPair<Long, ObTableParam> pair : pairList) {
@@ -1605,6 +1605,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEnt
16051605
* @throws Exception
16061606
*/
16071607
private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry tableEntry,
1608+
List<String> scanRangeColumns,
16081609
Object[] start,
16091610
boolean startIncluded,
16101611
Object[] end,
@@ -1620,13 +1621,13 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
16201621
return replicas;
16211622
} else if (tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ONE) {
16221623
List<Long> partIds = tableEntry.getPartitionInfo().getFirstPartDesc()
1623-
.getPartIds(start, startIncluded, end, endIncluded);
1624+
.getPartIds(scanRangeColumns, start, startIncluded, end, endIncluded);
16241625
for (Long partId : partIds) {
16251626
replicas.add(new ObPair<Long, ReplicaLocation>(partId, getPartitionLocation(
16261627
tableEntry, partId, route)));
16271628
}
16281629
} else if (tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_TWO) {
1629-
List<Long> partIds = getPartitionsForLevelTwo(tableEntry, start, startIncluded, end,
1630+
List<Long> partIds = getPartitionsForLevelTwo(tableEntry, scanRangeColumns, start, startIncluded, end,
16301631
endIncluded);
16311632
for (Long partId : partIds) {
16321633
replicas.add(new ObPair<Long, ReplicaLocation>(partId, getPartitionLocation(
@@ -1652,11 +1653,11 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
16521653
* @return list of ObPair of partId(logicId) and table obTableParams
16531654
* @throws Exception exception
16541655
*/
1655-
public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] start,
1656+
public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery query, Object[] start,
16561657
boolean startInclusive, Object[] end,
16571658
boolean endInclusive, boolean refresh,
16581659
boolean waitForRefresh) throws Exception {
1659-
return getTables(tableName, start, startInclusive, end, endInclusive, refresh,
1660+
return getTables(tableName, query, start, startInclusive, end, endInclusive, refresh,
16601661
waitForRefresh, getRoute(false));
16611662
}
16621663

@@ -1673,18 +1674,27 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] sta
16731674
* @return list of ObPair of partId(logicId) and tableParam
16741675
* @throws Exception exception
16751676
*/
1676-
public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] start,
1677+
public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery query, Object[] start,
16771678
boolean startInclusive, Object[] end,
16781679
boolean endInclusive, boolean refresh,
16791680
boolean waitForRefresh, ObServerRoute route)
16801681
throws Exception {
16811682

16821683
// 1. get TableEntry information
16831684
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);
1685+
1686+
List<String> scanRangeColumns = query.getScanRangeColumns();
1687+
if (scanRangeColumns == null || scanRangeColumns.size() == 0) {
1688+
Map<String, Integer> tableRowKeyElement = tableEntry.getRowKeyElement();
1689+
if (tableRowKeyElement != null) {
1690+
scanRangeColumns = new ArrayList<>(tableRowKeyElement.keySet());
1691+
}
1692+
}
1693+
16841694
// 2. get replica location
16851695
// partIdWithReplicaList -> List<pair<logicId(partition id in 3.x), replica>>
16861696
List<ObPair<Long, ReplicaLocation>> partIdWithReplicaList = getPartitionReplica(tableEntry,
1687-
start, startInclusive, end, endInclusive, route);
1697+
scanRangeColumns, start, startInclusive, end, endInclusive, route);
16881698

16891699
// obTableParams -> List<Pair<logicId, obTableParams>>
16901700
List<ObPair<Long, ObTableParam>> obTableParams = new ArrayList<ObPair<Long, ObTableParam>>();
@@ -2774,7 +2784,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
27742784
end[i] = endKey.getObj(i).getValue();
27752785
}
27762786
ObBorderFlag borderFlag = rang.getBorderFlag();
2777-
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
2787+
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(), tableQuery,
27782788
start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
27792789
false, false);
27802790
for (ObPair<Long, ObTableParam> pair : pairList) {
@@ -3150,7 +3160,7 @@ public void addRowKeyElement(String tableName, String[] columns) {
31503160
if (tableName == null || tableName.length() == 0) {
31513161
throw new IllegalArgumentException("table name is null");
31523162
}
3153-
Map<String, Integer> rowKeyElement = new HashMap<String, Integer>();
3163+
Map<String, Integer> rowKeyElement = new LinkedHashMap<>();
31543164
for (int i = 0; i < columns.length; i++) {
31553165
rowKeyElement.put(columns[i], i);
31563166
}

src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@
2424
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2525

2626
import java.util.HashMap;
27+
import java.util.LinkedHashMap;
2728
import java.util.Map;
2829

2930
import static com.google.common.base.Preconditions.checkArgument;
3031

3132
public class TableEntry {
3233

33-
public static final Map<String, Integer> HBASE_ROW_KEY_ELEMENT = new HashMap<String, Integer>() {
34+
public static final Map<String, Integer> HBASE_ROW_KEY_ELEMENT = new LinkedHashMap<String, Integer>() {
3435
{
3536
put("K", 0);
3637
put("Q", 1);

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObHashPartDesc.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alipay.oceanbase.rpc.location.model.partition;
1919

2020
import com.alipay.oceanbase.rpc.exception.ObTablePartitionConsistentException;
21+
import com.alipay.oceanbase.rpc.mutation.Row;
2122
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObColumn;
2223
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2324
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
@@ -159,6 +160,81 @@ public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] en
159160
}
160161
}
161162

163+
@Override
164+
public List<Long> getPartIds(List<String> scanRangeColumns, Object[] start, boolean startInclusive,
165+
Object[] end, boolean endInclusive) throws IllegalArgumentException {
166+
try {
167+
if (start.length != end.length) {
168+
throw new IllegalArgumentException("length of start key and end key in range is not equal, " +
169+
"the start key: " + start + ", the end key: " + end);
170+
}
171+
172+
if (start.length == 1 && start[0] instanceof ObObj && ((ObObj) start[0]).isMinObj() &&
173+
end.length == 1 && end[0] instanceof ObObj && ((ObObj) end[0]).isMaxObj()) {
174+
return completeWorks;
175+
}
176+
177+
if (scanRangeColumns.size() != start.length) {
178+
throw new IllegalArgumentException("length of start key in range and scan range columns is not equal," +
179+
"the start key: " + start + ", the scan range columns: " + scanRangeColumns);
180+
}
181+
182+
Row startRow = new Row();
183+
Row endRow = new Row();
184+
for (int i = 0; i < scanRangeColumns.size(); i++) {
185+
startRow.add(scanRangeColumns.get(i), start[i]);
186+
endRow.add(scanRangeColumns.get(i), end[i]);
187+
}
188+
189+
// check whether partition key is Min or Max, should refactor after remove addRowkeyElement
190+
for (ObColumn partColumn : partColumns) {
191+
List<String> refColumns = partColumn.getRefColumnNames();
192+
for (String column : refColumns) {
193+
if (startRow.get(column) instanceof ObObj
194+
&& (((ObObj) startRow.get(column)).isMinObj() || ((ObObj) startRow.get(column))
195+
.isMaxObj())) {
196+
return completeWorks;
197+
}
198+
if (endRow.get(column) instanceof ObObj
199+
&& (((ObObj) endRow.get(column)).isMinObj() || ((ObObj) endRow.get(column)).isMaxObj())) {
200+
return completeWorks;
201+
}
202+
}
203+
}
204+
205+
// eval partition key
206+
List<Object> startValues = evalRowKeyValues(startRow);
207+
Object startValue = startValues.get(0);
208+
List<Object> endValues = evalRowKeyValues(endRow);
209+
Object endValue = endValues.get(0);
210+
211+
Long startLongValue = ObObjType.parseToLongOrNull(startValue);
212+
Long endLongValue = ObObjType.parseToLongOrNull(endValue);
213+
214+
if (startLongValue == null || endLongValue == null) {
215+
throw new NumberFormatException("can not parseToComparable start value ["
216+
+ startValue + "] or end value [" + endValue
217+
+ "] to long");
218+
}
219+
long startHashValue = startLongValue - (startInclusive ? 0 : -1);
220+
long endHashValue = endLongValue - (endInclusive ? 0 : 1);
221+
222+
if (endHashValue - startHashValue + 1 >= partNum) {
223+
return completeWorks;
224+
} else {
225+
List<Long> partIds = new ArrayList<Long>();
226+
for (long i = startHashValue; i <= endHashValue; i++) {
227+
partIds.add(innerHash(i));
228+
}
229+
return partIds;
230+
}
231+
} catch (IllegalArgumentException e) {
232+
logger.error(LCD.convert("01-00002"), e);
233+
throw new IllegalArgumentException(
234+
"ObHashPartDesc get part id come across illegal params", e);
235+
}
236+
}
237+
162238
/*
163239
* Get random part id.
164240
*/

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObKeyPartDesc.java

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alipay.oceanbase.rpc.location.model.partition;
1919

2020
import com.alipay.oceanbase.rpc.exception.ObTablePartitionConsistentException;
21+
import com.alipay.oceanbase.rpc.mutation.Row;
2122
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObCollationType;
2223
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObColumn;
2324
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
@@ -125,10 +126,8 @@ public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] en
125126
}
126127

127128
if (startValues.equals(endValues)) {
128-
List<Object[]> rowKeys = new ArrayList<Object[]>();
129129
List<Long> partIds = new ArrayList<Long>();
130-
rowKeys.add(start);
131-
partIds.add(getPartId(rowKeys, false));
130+
partIds.add(calcPartId(startValues));
132131
return partIds;
133132
} else {
134133
// partition key is different in key partition
@@ -141,6 +140,72 @@ public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] en
141140
}
142141
}
143142

143+
// get partition ids for query
144+
public List<Long> getPartIds(List<String> scanRangeColumns, Object[] start, boolean startInclusive,
145+
Object[] end, boolean endInclusive) {
146+
147+
try {
148+
if (start.length != end.length) {
149+
throw new IllegalArgumentException("length of start key and end key is not equal");
150+
}
151+
152+
if (start.length == 1 && start[0] instanceof ObObj && ((ObObj) start[0]).isMinObj() &&
153+
end.length == 1 && end[0] instanceof ObObj && ((ObObj) end[0]).isMaxObj()) {
154+
return completeWorks;
155+
}
156+
157+
if (scanRangeColumns.size() != start.length) {
158+
throw new IllegalArgumentException("length of key and scan range columns is not equal");
159+
}
160+
161+
Row startRow = new Row();
162+
Row endRow = new Row();
163+
for (int i = 0; i < scanRangeColumns.size(); i++) {
164+
startRow.add(scanRangeColumns.get(i), start[i]);
165+
endRow.add(scanRangeColumns.get(i), end[i]);
166+
}
167+
168+
// check whether partition key is Min or Max, should refactor after remove addRowkeyElement
169+
for (ObColumn partColumn : partColumns) {
170+
List<String> refColumns = partColumn.getRefColumnNames();
171+
for (String column : refColumns) {
172+
if (startRow.get(column) instanceof ObObj
173+
&& (((ObObj) startRow.get(column)).isMinObj() || ((ObObj) startRow.get(column))
174+
.isMaxObj())) {
175+
return completeWorks;
176+
}
177+
if (endRow.get(column) instanceof ObObj
178+
&& (((ObObj) endRow.get(column)).isMinObj() || ((ObObj) endRow.get(column)).isMaxObj())) {
179+
return completeWorks;
180+
}
181+
}
182+
}
183+
184+
// eval partition key
185+
List<Object> startValues = evalRowKeyValues(startRow);
186+
List<Object> endValues = evalRowKeyValues(endRow);
187+
188+
if (startValues == null || endValues == null) {
189+
throw new NumberFormatException("can not parseToComparable start value ["
190+
+ startValues + "] or end value [" + endValues
191+
+ "] to long");
192+
}
193+
194+
if (startValues.equals(endValues)) {
195+
List<Long> partIds = new ArrayList<Long>();
196+
partIds.add(calcPartId(startValues));
197+
return partIds;
198+
} else {
199+
// partition key is different in key partition
200+
return completeWorks;
201+
}
202+
} catch (IllegalArgumentException e) {
203+
logger.error(LCD.convert("01-00002"), e);
204+
throw new IllegalArgumentException(
205+
"ObKeyPartDesc get part id come across illegal params", e);
206+
}
207+
}
208+
144209
/*
145210
* Get random part id.
146211
*/
@@ -197,23 +262,32 @@ public Long getPartId(List<Object[]> rowKeys, boolean consistency) {
197262
}
198263
}
199264

200-
long hashValue = 0L;
201-
for (int i = 0; i < partRefColumnSize; i++) {
202-
hashValue = ObHashUtils.toHashcode(evalValues.get(i),
203-
orderedPartRefColumnRowKeyRelations.get(i).getLeft(), hashValue,
204-
this.getPartFuncType());
205-
}
206-
207-
hashValue = (hashValue > 0 ? hashValue : -hashValue);
208-
return ((long) partSpace << ObPartConstants.OB_PART_IDS_BITNUM)
209-
| (hashValue % this.partNum);
265+
return calcPartId(evalValues);
210266
} catch (IllegalArgumentException e) {
211267
logger.error(LCD.convert("01-00023"), e);
212268
throw new IllegalArgumentException(
213269
"ObKeyPartDesc get part id come across illegal params", e);
214270
}
215271
}
216272

273+
// calc partition id from eval values
274+
private Long calcPartId(List<Object> evalValues) {
275+
if (evalValues == null || evalValues.size() != orderedPartRefColumnRowKeyRelations.size()) {
276+
throw new IllegalArgumentException("invalid eval values :" + evalValues);
277+
}
278+
279+
long hashValue = 0L;
280+
for (int i = 0; i < orderedPartRefColumnRowKeyRelations.size(); i++) {
281+
hashValue = ObHashUtils.toHashcode(evalValues.get(i),
282+
orderedPartRefColumnRowKeyRelations.get(i).getLeft(), hashValue,
283+
this.getPartFuncType());
284+
}
285+
286+
hashValue = (hashValue > 0 ? hashValue : -hashValue);
287+
return ((long) partSpace << ObPartConstants.OB_PART_IDS_BITNUM)
288+
| (hashValue % this.partNum);
289+
}
290+
217291
private boolean equalsWithCollationType(ObCollationType collationType, Object s, Object t)
218292
throws IllegalArgumentException {
219293
if (collationType == ObCollationType.CS_TYPE_UTF8MB4_GENERAL_CI) {

0 commit comments

Comments
 (0)