Skip to content

Commit de4dc74

Browse files
committed
init partial serialization and query setting
1 parent 58e8bdb commit de4dc74

File tree

3 files changed

+120
-35
lines changed

3 files changed

+120
-35
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java

Lines changed: 90 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,22 @@
2020
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2121
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2222
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter;
2324
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder;
26+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
27+
import com.alipay.oceanbase.rpc.table.ObKVParams;
2428
import com.alipay.oceanbase.rpc.util.Serialization;
2529
import io.netty.buffer.ByteBuf;
2630

2731
import java.util.*;
2832

29-
public class ObTableSingleOpQuery extends AbstractPayload {
30-
private String indexName;
33+
public class ObTableSingleOpQuery extends ObTableQuery {
3134
private List<String> scanRangeColumns = new ArrayList<>();
3235
private byte[] scanRangeBitMap = null;
3336
private long scanRangeBitLen = 0;
3437
private List<String> aggColumnNames = new ArrayList<>();
3538

36-
private List<ObNewRange> scanRanges = new ArrayList<>();
37-
38-
private String filterString;
39-
4039
/*
4140
* Encode.
4241
*/
@@ -63,10 +62,10 @@ public byte[] encode() {
6362
}
6463

6564
// 3. encode scan ranges
66-
len = Serialization.getNeedBytes(scanRanges.size());
67-
System.arraycopy(Serialization.encodeVi64(scanRanges.size()), 0, bytes, idx, len);
65+
len = Serialization.getNeedBytes(keyRanges.size());
66+
System.arraycopy(Serialization.encodeVi64(keyRanges.size()), 0, bytes, idx, len);
6867
idx += len;
69-
for (ObNewRange range : scanRanges) {
68+
for (ObNewRange range : keyRanges) {
7069
len = ObTableSerialUtil.getEncodedSize(range);
7170
System.arraycopy(ObTableSerialUtil.encode(range), 0, bytes, idx, len);
7271
idx += len;
@@ -77,6 +76,39 @@ public byte[] encode() {
7776
System.arraycopy(Serialization.encodeVString(filterString), 0, bytes, idx, len);
7877
idx += len;
7978

79+
// selectColumns + scanOrder + hTableFilter + obKVParams
80+
81+
len = Serialization.getNeedBytes(selectColumns.size());
82+
System.arraycopy(Serialization.encodeVi64(selectColumns.size()), 0, bytes, idx, len);
83+
idx += len;
84+
for (String selectColumn : selectColumns) {
85+
len = Serialization.getNeedBytes(selectColumn);
86+
System.arraycopy(Serialization.encodeVString(selectColumn), 0, bytes, idx, len);
87+
idx += len;
88+
}
89+
90+
System.arraycopy(Serialization.encodeI8(scanOrder.getByteValue()), 0, bytes, idx, 1);
91+
idx += 1;
92+
93+
if (isHbaseQuery) {
94+
len = (int) hTableFilter.getPayloadSize();
95+
System.arraycopy(hTableFilter.encode(), 0, bytes, idx, len);
96+
} else {
97+
len = HTABLE_DUMMY_BYTES.length;
98+
System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len);
99+
}
100+
idx += len;
101+
102+
if (isHbaseQuery && obKVParams != null) {
103+
len = (int) obKVParams.getPayloadSize();
104+
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
105+
idx += len;
106+
} else {
107+
len = HTABLE_DUMMY_BYTES.length;
108+
System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len);
109+
idx += len;
110+
}
111+
80112
return bytes;
81113
}
82114

@@ -110,7 +142,7 @@ public Object decode(ByteBuf buf) {
110142
for (int i = 0; i < len; i++) {
111143
ObNewRange range = new ObNewRange();
112144
ObTableSerialUtil.decode(buf, range);
113-
scanRanges.add(range);
145+
keyRanges.add(range);
114146
}
115147

116148
// 4. decode filter string
@@ -129,13 +161,32 @@ public long getPayloadContentSize() {
129161
payloadContentSize += Serialization.getNeedBytes(scanRangeBitLen);
130162
payloadContentSize += scanRangeBitMap.length;
131163

132-
payloadContentSize += Serialization.getNeedBytes(scanRanges.size());
133-
for (ObNewRange range : scanRanges) {
164+
payloadContentSize += Serialization.getNeedBytes(keyRanges.size());
165+
for (ObNewRange range : keyRanges) {
134166
payloadContentSize += ObTableSerialUtil.getEncodedSize(range);
135167
}
136168

137-
return payloadContentSize + Serialization.getNeedBytes(indexName)
138-
+ Serialization.getNeedBytes(filterString);
169+
payloadContentSize += Serialization.getNeedBytes(indexName);
170+
payloadContentSize += Serialization.getNeedBytes(filterString);
171+
172+
payloadContentSize += Serialization.getNeedBytes(selectColumns.size());
173+
for (String selectColumn : selectColumns) {
174+
payloadContentSize += Serialization.getNeedBytes(selectColumn);
175+
}
176+
payloadContentSize += 1; // scanOrder
177+
178+
if (isHbaseQuery) {
179+
payloadContentSize += hTableFilter.getPayloadSize();
180+
} else {
181+
payloadContentSize += HTABLE_DUMMY_BYTES.length;
182+
}
183+
if (isHbaseQuery && obKVParams != null) {
184+
payloadContentSize += obKVParams.getPayloadSize();
185+
} else {
186+
payloadContentSize += HTABLE_DUMMY_BYTES.length;
187+
}
188+
189+
return payloadContentSize;
139190
}
140191

141192
// Support class, which is used for column name sorted
@@ -181,7 +232,7 @@ public void adjustScanRangeColumns(Map<String, Long> columnNameIdxMap) {
181232

182233
Collections.sort(pairs);
183234

184-
for (ObNewRange range : scanRanges) {
235+
for (ObNewRange range : keyRanges) {
185236
List<ObObj> startKey= range.getStartKey().getObjs();
186237
List<ObObj> endKey= range.getStartKey().getObjs();
187238
List<ObObj> adjustStartKey = new ArrayList<>(startKey.size());
@@ -199,15 +250,15 @@ public void adjustScanRangeColumns(Map<String, Long> columnNameIdxMap) {
199250
}
200251

201252
public List<ObNewRange> getScanRanges() {
202-
return scanRanges;
253+
return keyRanges;
203254
}
204255

205256
public void setScanRanges(List<ObNewRange> scanRanges) {
206-
this.scanRanges = scanRanges;
257+
this.keyRanges = scanRanges;
207258
}
208259

209260
public void addScanRange(ObNewRange scanRange) {
210-
this.scanRanges.add(scanRange);
261+
this.keyRanges.add(scanRange);
211262
}
212263

213264
public void addScanRangeColumns(List<String> scanRangeColumns) {
@@ -229,4 +280,25 @@ public List<String> getScanRangeColumns() {
229280
public void setAggColumnNames(List<String> columnNames) {
230281
this.aggColumnNames = columnNames;
231282
}
283+
284+
public static ObTableSingleOpQuery getInstance(String indexName,
285+
List<ObNewRange> keyRanges,
286+
List<String> selectColumns,
287+
ObScanOrder scanOrder,
288+
boolean isHbaseQuery,
289+
ObHTableFilter obHTableFilter,
290+
ObKVParams obKVParams,
291+
String filterString) {
292+
ObTableSingleOpQuery query = new ObTableSingleOpQuery();
293+
query.setIndexName(indexName);
294+
query.setScanRanges(keyRanges);
295+
query.setSelectColumns(selectColumns);
296+
query.setScanOrder(scanOrder);
297+
if (isHbaseQuery) {
298+
query.sethTableFilter(obHTableFilter);
299+
query.setObKVParams(obKVParams);
300+
}
301+
query.setFilterString(filterString);
302+
return query;
303+
}
232304
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,26 @@
5555
*/
5656
public class ObTableQuery extends AbstractPayload {
5757

58-
private List<ObNewRange> keyRanges = new LinkedList<ObNewRange>();
59-
private List<String> selectColumns = new LinkedList<String>();
60-
private String filterString;
61-
private int limit = -1;
62-
private int offset = 0;
63-
private ObScanOrder scanOrder = ObScanOrder.Forward;
64-
private String indexName;
65-
private int batchSize = -1;
66-
private long maxResultSize = -1;
67-
private ObHTableFilter hTableFilter;
68-
69-
private static final byte[] HTABLE_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
70-
private boolean isHbaseQuery = false;
71-
private List<String> scanRangeColumns = new LinkedList<String>();
72-
73-
private List<ObTableAggregationSingle> aggregations = new LinkedList<>();
58+
protected List<ObNewRange> keyRanges = new LinkedList<ObNewRange>();
59+
protected List<String> selectColumns = new LinkedList<String>();
60+
protected String filterString;
61+
protected int limit = -1;
62+
protected int offset = 0;
63+
protected ObScanOrder scanOrder = ObScanOrder.Forward;
64+
protected String indexName;
65+
protected int batchSize = -1;
66+
protected long maxResultSize = -1;
67+
protected ObHTableFilter hTableFilter;
68+
69+
protected static final byte[] HTABLE_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
70+
protected boolean isHbaseQuery = false;
71+
protected List<String> scanRangeColumns = new LinkedList<String>();
72+
73+
protected List<ObTableAggregationSingle> aggregations = new LinkedList<>();
7474

7575
private Long partId = null;
7676

77-
private ObKVParams obKVParams;
77+
protected ObKVParams obKVParams;
7878

7979
public void adjustStartKey(List<ObObj> key) throws IllegalArgumentException {
8080
List<ObNewRange> keyRanges = getKeyRanges();

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
3434
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
3535
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
36+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
3637
import com.alipay.oceanbase.rpc.threadlocal.ThreadLocalMap;
3738
import com.alipay.oceanbase.rpc.util.MonitorUtil;
3839
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
@@ -197,6 +198,18 @@ public void addOperation(TableQuery query) throws Exception {
197198
needAllProp = true;
198199
}
199200
ObTableSingleOp singleOp = new ObTableSingleOp();
201+
if (entityType == ObTableEntityType.HKV) {
202+
ObTableQuery obTableQuery = query.getObTableQuery();
203+
ObTableSingleOpQuery singleOpQuery = ObTableSingleOpQuery.getInstance(obTableQuery.getIndexName(),
204+
obTableQuery.getKeyRanges(),
205+
obTableQuery.getSelectColumns(),
206+
obTableQuery.getScanOrder(),
207+
obTableQuery.isHbaseQuery(),
208+
obTableQuery.gethTableFilter(),
209+
obTableQuery.getObKVParams(),
210+
obTableQuery.getFilterString());
211+
singleOp.setQuery(singleOpQuery);
212+
}
200213
singleOp.setSingleOpType(ObTableOperationType.GET);
201214
singleOp.addEntity(entity);
202215
addOperation(singleOp);

0 commit comments

Comments
 (0)