Skip to content

Commit 5aa84e7

Browse files
authored
Hbase Batch Get Support (#269)
* init partial serialization and query setting * add get methods * fix bug * fix multi cf polute query * add version control for batch get * fix compatibility problem * change hbase batch get support version from 4251 to 4252 * use correct tableName if using index or hbase tableGroup * do compatibility to old server, remove useless comments * remove useless import * add CellTTL version control * review comments * add version control for batch put and delete * set isReadOnly for hbase get
1 parent 58e8bdb commit 5aa84e7

File tree

14 files changed

+212
-68
lines changed

14 files changed

+212
-68
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,30 @@ public static boolean isReturnOneResultSupport() {
9191
|| OB_VERSION >= OB_VERSION_4_3_4_0;
9292
}
9393

94+
public static boolean isHBaseBatchGetSupport() {
95+
return OB_VERSION >= OB_VERSION_4_2_5_2 && OB_VERSION < OB_VERSION_4_3_0_0;
96+
}
97+
98+
public static boolean isHBaseBatchSupport() {
99+
return OB_VERSION >= OB_VERSION_4_2_5_2 && OB_VERSION < OB_VERSION_4_3_0_0
100+
|| OB_VERSION >= OB_VERSION_4_3_5_0;
101+
}
102+
103+
public static boolean isCellTTLSupport() {
104+
return OB_VERSION >= OB_VERSION_4_3_5_1;
105+
}
106+
94107
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
95108

109+
public static final long OB_VERSION_4_2_5_2 = calcVersion(4, (short) 2, (byte) 5, (byte) 2);
110+
96111
public static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0);
97112

98113
public static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0);
99114

115+
public static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0);
116+
117+
public static final long OB_VERSION_4_3_5_1 = calcVersion(4, (short) 3, (byte) 5, (byte) 1);
118+
100119
public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
101120
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/protocol/v0/ObDirectLoadProtocolV0.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public void checkIsSupported(ObDirectLoadStatement statement) throws ObDirectLoa
6969
logger.warn("partition names in ob version " + ObGlobal.getObVsnString(obVersion)
7070
+ "is not supported, minimum version required is "
7171
+ ObGlobal.getObVsnString(OB_VERSION_4_3_5_0));
72-
throw new ObDirectLoadNotSupportedException("partition names in ob version "
73-
+ ObGlobal.getObVsnString(obVersion)
74-
+ " is not supported, minimum version required is "
75-
+ ObGlobal.getObVsnString(OB_VERSION_4_3_5_0));
72+
throw new ObDirectLoadNotSupportedException(
73+
"partition names in ob version " + ObGlobal.getObVsnString(obVersion)
74+
+ " is not supported, minimum version required is "
75+
+ ObGlobal.getObVsnString(OB_VERSION_4_3_5_0));
7676
}
7777
}
7878

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,23 @@ public BatchOperation setTable(String tableName) {
8989
* add queries
9090
*/
9191
public BatchOperation addOperation(TableQuery... queries) {
92-
if (isSameType && lastType != ObTableOperationType.INVALID
93-
&& lastType != ObTableOperationType.GET) {
94-
isSameType = false;
92+
boolean isHBaseQuery = false;
93+
if (queries != null && queries.length > 0) {
94+
isHBaseQuery = queries[0].getObTableQuery().isHbaseQuery();
95+
}
96+
if (isHBaseQuery) {
97+
if (isSameType && lastType != ObTableOperationType.INVALID
98+
&& lastType != ObTableOperationType.SCAN) {
99+
isSameType = false;
100+
}
101+
lastType = ObTableOperationType.SCAN;
102+
} else {
103+
if (isSameType && lastType != ObTableOperationType.INVALID
104+
&& lastType != ObTableOperationType.GET) {
105+
isSameType = false;
106+
}
107+
lastType = ObTableOperationType.GET;
95108
}
96-
97-
lastType = ObTableOperationType.GET;
98109
this.operations.addAll(Arrays.asList(queries));
99110
return this;
100111
}
@@ -104,7 +115,7 @@ public BatchOperation addOperation(TableQuery... queries) {
104115
*/
105116
public BatchOperation addOperation(Get... gets) {
106117
if (isSameType && lastType != ObTableOperationType.INVALID
107-
&& lastType != ObTableOperationType.GET) {
118+
&& lastType != ObTableOperationType.GET) {
108119
isSameType = false;
109120
}
110121

@@ -197,6 +208,7 @@ private BatchOperationResult executeWithNormalBatchOp() throws Exception {
197208
throw new IllegalArgumentException("table name is null");
198209
}
199210
TableBatchOps batchOps = client.batch(tableName);
211+
batchOps.setEntityType(entityType);
200212
boolean hasSetRowkeyElement = false;
201213

202214
for (Object operation : operations) {
@@ -276,7 +288,6 @@ private BatchOperationResult executeWithNormalBatchOp() throws Exception {
276288
throw new ObTableException("unknown operation " + operation);
277289
}
278290
}
279-
batchOps.setEntityType(entityType);
280291
batchOps.setAtomicOperation(isAtomic);
281292
batchOps.setReturnOneResult(returnOneResult);
282293
return new BatchOperationResult(batchOps.executeWithResult());
@@ -292,6 +303,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
292303

293304
if (client instanceof ObTableClient) {
294305
batchOps = new ObTableClientLSBatchOpsImpl(tableName, (ObTableClient) client);
306+
batchOps.setEntityType(entityType);
295307
for (Object operation : operations) {
296308
if (operation instanceof CheckAndInsUp) {
297309
checkAndInsUpCnt++;
@@ -342,7 +354,6 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
342354
batchOps.setReturningAffectedEntity(withResult);
343355
batchOps.setReturnOneResult(returnOneResult);
344356
batchOps.setAtomicOperation(isAtomic);
345-
batchOps.setEntityType(entityType);
346357
return new BatchOperationResult(batchOps.executeWithResult());
347358
}
348359

src/main/java/com/alipay/oceanbase/rpc/mutation/result/MutationResult.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,8 @@ public Row getOperationRow() {
8787
}
8888
return new Row(rowsMap);
8989
}
90+
91+
public ObPayload getResult() {
92+
return this.result;
93+
}
9094
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ public void addTabletOperation(ObTableTabletOp tabletOp) {
121121
public void setLsOperation(ObTableLSOperation lsOperation) {
122122
this.lsOperation = lsOperation;
123123
}
124-
124+
125125
/*
126126
* Get entity type.
127127
*/
128128
public ObTableEntityType getEntityType() {
129129
return entityType;
130130
}
131-
131+
132132
/*
133133
* Set entity type.
134134
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public enum ObTableOperationType {
4242
false, // REPLACE
4343
false, // INCREMENT
4444
false, // APPEND
45-
false, // SCAN
45+
true, // SCAN
4646
false, // TTL
4747
true, // CHECK_AND_INSERT_UP
4848
false, // PUT

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ public void addEntity(ObTableSingleOpEntity entity) {
163163
public List<ObObj> getRowkeyObjs() {
164164
List<ObObj> rowkeyObjs;
165165
if (singleOpType == ObTableOperationType.SCAN) {
166-
throw new IllegalArgumentException("can not get rowkey from scan operation");
166+
if (query.isHbaseQuery()) {
167+
rowkeyObjs = entities.get(0).getRowkey();
168+
} else {
169+
throw new IllegalArgumentException("can not get rowkey from scan operation");
170+
}
167171
} else if (singleOpType == ObTableOperationType.CHECK_AND_INSERT_UP) {
168172
rowkeyObjs = getScanRange().get(0).getStartKey().getObjs();
169173
} else {

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,4 +387,8 @@ public void setRowkey(List<ObObj> rowkey) {
387387
this.rowkey = rowkey;
388388
}
389389

390+
public List<ObObj> getPropertiesValues() {
391+
return this.propertiesValues;
392+
}
393+
390394
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java

Lines changed: 93 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,25 @@
1717

1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
1919

20-
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
20+
import com.alipay.oceanbase.rpc.ObGlobal;
2121
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2222
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter;
2324
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder;
26+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
27+
import com.alipay.oceanbase.rpc.table.ObKVParams;
2428
import com.alipay.oceanbase.rpc.util.Serialization;
2529
import io.netty.buffer.ByteBuf;
2630

2731
import java.util.*;
2832

29-
public class ObTableSingleOpQuery extends AbstractPayload {
30-
private String indexName;
33+
public class ObTableSingleOpQuery extends ObTableQuery {
3134
private List<String> scanRangeColumns = new ArrayList<>();
3235
private byte[] scanRangeBitMap = null;
3336
private long scanRangeBitLen = 0;
3437
private List<String> aggColumnNames = new ArrayList<>();
3538

36-
private List<ObNewRange> scanRanges = new ArrayList<>();
37-
38-
private String filterString;
39-
4039
/*
4140
* Encode.
4241
*/
@@ -63,10 +62,10 @@ public byte[] encode() {
6362
}
6463

6564
// 3. encode scan ranges
66-
len = Serialization.getNeedBytes(scanRanges.size());
67-
System.arraycopy(Serialization.encodeVi64(scanRanges.size()), 0, bytes, idx, len);
65+
len = Serialization.getNeedBytes(keyRanges.size());
66+
System.arraycopy(Serialization.encodeVi64(keyRanges.size()), 0, bytes, idx, len);
6867
idx += len;
69-
for (ObNewRange range : scanRanges) {
68+
for (ObNewRange range : keyRanges) {
7069
len = ObTableSerialUtil.getEncodedSize(range);
7170
System.arraycopy(ObTableSerialUtil.encode(range), 0, bytes, idx, len);
7271
idx += len;
@@ -77,6 +76,34 @@ public byte[] encode() {
7776
System.arraycopy(Serialization.encodeVString(filterString), 0, bytes, idx, len);
7877
idx += len;
7978

79+
// encode HBase Batch Get required
80+
if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) {
81+
len = Serialization.getNeedBytes(selectColumns.size());
82+
System.arraycopy(Serialization.encodeVi64(selectColumns.size()), 0, bytes, idx, len);
83+
idx += len;
84+
for (String selectColumn : selectColumns) {
85+
len = Serialization.getNeedBytes(selectColumn);
86+
System.arraycopy(Serialization.encodeVString(selectColumn), 0, bytes, idx, len);
87+
idx += len;
88+
}
89+
90+
System.arraycopy(Serialization.encodeI8(scanOrder.getByteValue()), 0, bytes, idx, 1);
91+
idx += 1;
92+
93+
len = (int) hTableFilter.getPayloadSize();
94+
System.arraycopy(hTableFilter.encode(), 0, bytes, idx, len);
95+
idx += len;
96+
97+
if (obKVParams != null) {
98+
len = (int) obKVParams.getPayloadSize();
99+
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
100+
idx += len;
101+
} else {
102+
len = HTABLE_DUMMY_BYTES.length;
103+
System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len);
104+
idx += len;
105+
}
106+
}
80107
return bytes;
81108
}
82109

@@ -110,7 +137,7 @@ public Object decode(ByteBuf buf) {
110137
for (int i = 0; i < len; i++) {
111138
ObNewRange range = new ObNewRange();
112139
ObTableSerialUtil.decode(buf, range);
113-
scanRanges.add(range);
140+
keyRanges.add(range);
114141
}
115142

116143
// 4. decode filter string
@@ -129,13 +156,35 @@ public long getPayloadContentSize() {
129156
payloadContentSize += Serialization.getNeedBytes(scanRangeBitLen);
130157
payloadContentSize += scanRangeBitMap.length;
131158

132-
payloadContentSize += Serialization.getNeedBytes(scanRanges.size());
133-
for (ObNewRange range : scanRanges) {
159+
payloadContentSize += Serialization.getNeedBytes(keyRanges.size());
160+
for (ObNewRange range : keyRanges) {
134161
payloadContentSize += ObTableSerialUtil.getEncodedSize(range);
135162
}
136163

137-
return payloadContentSize + Serialization.getNeedBytes(indexName)
138-
+ Serialization.getNeedBytes(filterString);
164+
payloadContentSize += Serialization.getNeedBytes(indexName);
165+
payloadContentSize += Serialization.getNeedBytes(filterString);
166+
167+
// calculate part required by HBase Batch Get
168+
if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) {
169+
payloadContentSize += Serialization.getNeedBytes(selectColumns.size());
170+
for (String selectColumn : selectColumns) {
171+
payloadContentSize += Serialization.getNeedBytes(selectColumn);
172+
}
173+
payloadContentSize += 1; // scanOrder
174+
175+
if (isHbaseQuery) {
176+
payloadContentSize += hTableFilter.getPayloadSize();
177+
} else {
178+
payloadContentSize += HTABLE_DUMMY_BYTES.length;
179+
}
180+
if (isHbaseQuery && obKVParams != null) {
181+
payloadContentSize += obKVParams.getPayloadSize();
182+
} else {
183+
payloadContentSize += HTABLE_DUMMY_BYTES.length;
184+
}
185+
}
186+
187+
return payloadContentSize;
139188
}
140189

141190
// Support class, which is used for column name sorted
@@ -181,7 +230,7 @@ public void adjustScanRangeColumns(Map<String, Long> columnNameIdxMap) {
181230

182231
Collections.sort(pairs);
183232

184-
for (ObNewRange range : scanRanges) {
233+
for (ObNewRange range : keyRanges) {
185234
List<ObObj> startKey= range.getStartKey().getObjs();
186235
List<ObObj> endKey= range.getStartKey().getObjs();
187236
List<ObObj> adjustStartKey = new ArrayList<>(startKey.size());
@@ -191,23 +240,25 @@ public void adjustScanRangeColumns(Map<String, Long> columnNameIdxMap) {
191240
adjustStartKey.add(startKey.get((int) pair.origin_idx));
192241
adjustEndtKey.add(endKey.get((int) pair.origin_idx));
193242
}
194-
range.getStartKey().setObjs(adjustStartKey);
195-
range.getEndKey().setObjs(adjustEndtKey);
243+
if (!adjustStartKey.isEmpty() && !adjustEndtKey.isEmpty()) {
244+
range.getStartKey().setObjs(adjustStartKey);
245+
range.getEndKey().setObjs(adjustEndtKey);
246+
}
196247
}
197248

198249
this.scanRangeBitMap = byteArray;
199250
}
200251

201252
public List<ObNewRange> getScanRanges() {
202-
return scanRanges;
253+
return keyRanges;
203254
}
204255

205256
public void setScanRanges(List<ObNewRange> scanRanges) {
206-
this.scanRanges = scanRanges;
257+
this.keyRanges = scanRanges;
207258
}
208259

209260
public void addScanRange(ObNewRange scanRange) {
210-
this.scanRanges.add(scanRange);
261+
this.keyRanges.add(scanRange);
211262
}
212263

213264
public void addScanRangeColumns(List<String> scanRangeColumns) {
@@ -229,4 +280,25 @@ public List<String> getScanRangeColumns() {
229280
public void setAggColumnNames(List<String> columnNames) {
230281
this.aggColumnNames = columnNames;
231282
}
283+
284+
public static ObTableSingleOpQuery getInstance(String indexName,
285+
List<ObNewRange> keyRanges,
286+
List<String> selectColumns,
287+
ObScanOrder scanOrder,
288+
boolean isHbaseQuery,
289+
ObHTableFilter obHTableFilter,
290+
ObKVParams obKVParams,
291+
String filterString) {
292+
ObTableSingleOpQuery query = new ObTableSingleOpQuery();
293+
query.setIndexName(indexName);
294+
query.setScanRanges(keyRanges);
295+
query.setSelectColumns(selectColumns);
296+
query.setScanOrder(scanOrder);
297+
if (isHbaseQuery) {
298+
query.sethTableFilter(obHTableFilter);
299+
query.setObKVParams(obKVParams);
300+
}
301+
query.setFilterString(filterString);
302+
return query;
303+
}
232304
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,12 @@ public void setSingleOperations(List<ObTableSingleOp> singleOperations) {
155155
}
156156
}
157157

158-
if (isSameType() && singleOperations.get(0).getSingleOpType() == ObTableOperationType.GET) {
159-
setIsReadOnly(true);
158+
if (isSameType()) {
159+
boolean isHbaseOps = singleOperations.get(0).getQuery().isHbaseQuery();
160+
if ((isHbaseOps && singleOperations.get(0).getSingleOpType() == ObTableOperationType.SCAN)
161+
|| (!isHbaseOps && singleOperations.get(0).getSingleOpType() == ObTableOperationType.GET)) {
162+
setIsReadOnly(true);
163+
}
160164
}
161165
this.singleOperations = singleOperations;
162166
}

0 commit comments

Comments
 (0)