Skip to content

Commit 7305dd8

Browse files
committed
set hbase op type to query
1 parent 4939065 commit 7305dd8

File tree

8 files changed

+82
-6
lines changed

8 files changed

+82
-6
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,13 +2271,15 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
22712271
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
22722272
((ObTableQueryRequest) request).getTableQuery(), this);
22732273
tableQuery.setEntityType(request.getEntityType());
2274+
tableQuery.setHbaseOpType(request.getHbaseOpType());
22742275
return new ObClusterTableQuery(tableQuery).executeInternal();
22752276
} else if (request instanceof ObTableQueryAsyncRequest) {
22762277
// TableGroup -> TableName
22772278
String tableName = request.getTableName();
22782279
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
22792280
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
22802281
tableQuery.setEntityType(request.getEntityType());
2282+
tableQuery.setHbaseOpType(request.getHbaseOpType());
22812283
ObClusterTableQuery clusterTableQuery = new ObClusterTableQuery(tableQuery);
22822284
clusterTableQuery.setAllowDistributeScan(((ObTableQueryAsyncRequest) request).isAllowDistributeScan());
22832285
return clusterTableQuery.asyncExecuteInternal();

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/OHOperationType.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
118
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
219

320
import java.util.*;
@@ -47,4 +64,37 @@ public int getValue() {
4764
public byte getByteValue() {
4865
return (byte) value;
4966
}
67+
68+
/*
69+
* CHECK_AND_PUT -> checkAndPut
70+
* PUT -> put
71+
*/
72+
public String toCamelCase() {
73+
String name = this.name();
74+
if (name == null || name.isEmpty()) {
75+
return name;
76+
}
77+
78+
String[] parts = name.split("_");
79+
StringBuilder sb = new StringBuilder();
80+
81+
for (int i = 0; i < parts.length; i++) {
82+
String part = parts[i];
83+
if (part == null || part.isEmpty()) {
84+
continue;
85+
}
86+
87+
if (i == 0) {
88+
sb.append(part.toLowerCase());
89+
} else {
90+
if (!part.isEmpty()) {
91+
sb.append(Character.toUpperCase(part.charAt(0)));
92+
if (part.length() > 1) {
93+
sb.append(part.substring(1).toLowerCase());
94+
}
95+
}
96+
}
97+
}
98+
return sb.toString();
99+
}
50100
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
option_flag_,
3939
op_type_,
4040
keys_,
41-
cf_rows_);
41+
cf_rows_,
42+
hbase_op_type_);
4243
*/
4344
/*
4445
[k1][k2][k3]...
@@ -53,7 +54,7 @@ public class ObHbaseRequest extends AbstractPayload implements Credentialable {
5354
protected ObTableOperationType opType;
5455
protected List<ObObj> keys = new ArrayList<>();
5556
protected List<ObHbaseCfRows> cfRows;
56-
protected OHOperationType hbaseOpType;
57+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
5758

5859
public ObHbaseRequest() {
5960
this.credential = new ObBytesString();
@@ -113,6 +114,9 @@ public byte[] encode() {
113114
ObHbaseCfRows sameCfRows = cfRows.get(i);
114115
sameCfRows.encode(buf);
115116
}
117+
118+
// 7. encode hbase op type, to differentiate put and put list
119+
Serialization.encodeI8(buf, hbaseOpType.getByteValue());
116120

117121
if (buf.pos != buf.bytes.length) {
118122
throw new IllegalArgumentException("error in encode ObHbaseRequest (" +
@@ -152,6 +156,7 @@ public long getPayloadContentSize() {
152156
for (ObHbaseCfRows cfRows : cfRows) {
153157
payLoadContentSize += cfRows.getPayloadSize();
154158
}
159+
payLoadContentSize += 1; // hbase_op_type_
155160
}
156161
return payLoadContentSize;
157162
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ public void setHbaseOpType(OHOperationType hbaseOpType) {
225225
this.hbaseOpType = hbaseOpType;
226226
}
227227

228+
public OHOperationType getHbaseOpType() {
229+
return hbaseOpType;
230+
}
231+
228232
public boolean getNeedTabletId() {
229233
return option_flag.isNeedTabletId();
230234
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,8 @@
3030
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
3131
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
3232
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
33-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
33+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
3434
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
35-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
36-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest;
37-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult;
3835
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3936
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
4037
import com.alipay.oceanbase.rpc.table.ObTable;
@@ -65,6 +62,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
6562
// global index: key is index table name (be like: __idx_<data_table_id>_<index_name>)
6663
protected String indexTableName;
6764
protected ObTableEntityType entityType;
65+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
6866
protected Map<Long, ObPair<Long, ObTableParam>> expectant;
6967
protected List<String> cacheProperties = new LinkedList<String>();
7068
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
@@ -832,4 +830,12 @@ public ObTableClient getClient() {
832830
public void setClient(ObTableClient client) {
833831
this.client = client;
834832
}
833+
834+
public OHOperationType getHbaseOpType() {
835+
return hbaseOpType;
836+
}
837+
838+
public void setHbaseOpType(OHOperationType hbaseOpType) {
839+
this.hbaseOpType = hbaseOpType;
840+
}
835841
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void init() throws Exception {
5858
request.setTableQuery(tableQuery);
5959
request.setEntityType(entityType);
6060
request.setConsistencyLevel(getReadConsistency().toObTableConsistencyLevel());
61+
request.setHbaseOpType(hbaseOpType);
6162

6263
// construct async query request
6364
asyncRequest.setObTableQueryRequest(request);

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ protected ObTableQueryResult referToNewPartition(ObPair<Long, ObTableParam> part
5050
request.setPartitionId(partitionId);
5151
request.setTableId(partIdWithObTable.getRight().getTableId());
5252
request.setEntityType(entityType);
53+
request.setHbaseOpType(hbaseOpType);
5354
if (operationTimeout > 0) {
5455
request.setTimeout(operationTimeout);
5556
} else {

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2929
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
3030
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
31+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.OHOperationType;
3132
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
3233
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
3334
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
@@ -53,6 +54,7 @@ public class ObTableClientQueryImpl extends AbstractTableQueryImpl {
5354
private Row rowKey; // only used by BatchOperation
5455

5556
private boolean allowDistributeScan = true;
57+
private OHOperationType hbaseOpType = OHOperationType.INVALID;
5658

5759
/*
5860
* Add aggregation.
@@ -162,6 +164,7 @@ private void setCommonParams2Result(AbstractQueryStreamResult result) throws Exc
162164
result.setExpectant(partitionObTables);
163165
result.setOperationTimeout(operationTimeout);
164166
result.setReadConsistency(obTableClient.getReadConsistency());
167+
result.setHbaseOpType(hbaseOpType);
165168
}
166169

167170
private abstract static class InitQueryResultCallback<T> {
@@ -445,4 +448,8 @@ public Long getPartId() {
445448
public void setAllowDistributeScan(boolean allowDistributeScan) {
446449
this.allowDistributeScan = allowDistributeScan;
447450
}
451+
452+
public void setHbaseOpType(OHOperationType hbaseOpType) {
453+
this.hbaseOpType = hbaseOpType;
454+
}
448455
}

0 commit comments

Comments
 (0)