Skip to content

Commit fb6aa87

Browse files
Hbase put perf (#368)
* new hbase protocal (#363) * new hbase request and result for multi-cf * odp support new hbase protocol * order table_name when get by tablegroup --------- Co-authored-by: vanson <[email protected]>
1 parent 398e55a commit fb6aa87

File tree

10 files changed

+702
-6
lines changed

10 files changed

+702
-6
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ public static boolean isCellTTLSupport() {
121121
return OB_VERSION >= OB_VERSION_4_3_5_1;
122122
}
123123

124+
public static boolean isHBasePutPerfSupport() {
125+
return OB_VERSION >= OB_VERSION_4_4_1_0;
126+
}
127+
128+
/*-------------------------------------------- OB_VERSION --------------------------------------------*/
129+
124130
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
125131

126132
public static final long OB_VERSION_4_2_5_2 = calcVersion(4, (short) 2, (byte) 5, (byte) 2);
@@ -139,5 +145,15 @@ public static boolean isCellTTLSupport() {
139145

140146
public static final long OB_VERSION_4_4_0_0 = calcVersion(4, (short) 4, (byte) 0, (byte) 0);
141147

148+
public static final long OB_VERSION_4_4_1_0 = calcVersion(4, (short) 4, (byte) 1, (byte) 0);
149+
142150
public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
151+
152+
/*-------------------------------------------- OB_PROXY_VERSION --------------------------------------------*/
153+
154+
public static final long OB_PROXY_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0);
155+
156+
public static final long OB_PROXY_VERSION_4_3_6_0 = calcVersion(4, (short) 3, (byte) 6, (byte) 0);
157+
158+
public static long OB_PROXY_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
143159
}

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2386,6 +2386,26 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23862386
+ "is not supported. make sure the correct version");
23872387
}
23882388

2389+
public ObPayload execute(final ObHbaseRequest request) throws Exception {
2390+
if (request.getTableName() == null || request.getTableName().isEmpty()) {
2391+
throw new IllegalArgumentException("table name is null");
2392+
}
2393+
if (isOdpMode()) {
2394+
return getOdpTable().execute(request);
2395+
} else {
2396+
Row row = new Row();
2397+
// get the first cell from the first cfRows to route
2398+
String realTableName = request.getCfRows().get(0).getRealTableName();
2399+
int keyIdx = request.getCfRows().get(0).getKeyIndex(0);
2400+
row.add("K", request.getKeys().get(keyIdx).getValue());
2401+
row.add("Q", request.getCfRows().get(0).getCells().get(0).getQ().getValue());
2402+
row.add("T", request.getCfRows().get(0).getCells().get(0).getT().getValue());
2403+
ObTableParam tableParam = tableRoute.getTableParam(realTableName, row);
2404+
ObTable obTable = tableParam.getObTable();
2405+
return executeWithRetry(obTable, request, request.getTableName());
2406+
}
2407+
}
2408+
23892409
private ObTableQueryAndMutate buildObTableQueryAndMutate(ObTableQuery obTableQuery,
23902410
ObTableBatchOperation obTableBatchOperation) {
23912411
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
2727
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
2828
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObFetchPartitionMetaResult;
29+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObHbaseResult;
2930
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
3031
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
3132
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
@@ -35,6 +36,7 @@
3536
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
3637
import com.alipay.remoting.CommandCode;
3738

39+
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_HBASE_EXECUTE;
3840
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_META_INFO_EXECUTE;
3941

4042
public enum ObTablePacketCode implements CommandCode {
@@ -143,6 +145,12 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
143145
public ObPayload newPayload(ObRpcPacketHeader header) {
144146
return new ObTableMetaResponse();
145147
}
148+
},
149+
OB_TABLE_API_HBASE_EXECUTE(Pcodes.OB_TABLE_API_HBASE_EXECUTE) {
150+
@Override
151+
public ObPayload newPayload(ObRpcPacketHeader header) {
152+
return new ObHbaseResult();
153+
}
146154
};
147155

148156
private short value;
@@ -185,8 +193,10 @@ public static ObTablePacketCode valueOf(short value) {
185193
return OB_TABLE_API_MOVE;
186194
case Pcodes.OB_ERROR_PACKET:
187195
return OB_ERROR_PACKET;
188-
case OB_TABLE_API_META_INFO_EXECUTE:
196+
case Pcodes.OB_TABLE_API_META_INFO_EXECUTE:
189197
return OB_TABLE_META_INFO_EXECUTE;
198+
case Pcodes.OB_TABLE_API_HBASE_EXECUTE:
199+
return OB_TABLE_API_HBASE_EXECUTE;
190200
}
191201
throw new IllegalArgumentException("Unknown Rpc command code value ," + value);
192202
}

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public class LocationUtil {
225225

226226
private static final String TABLE_GROUP_GET_TABLE_NAME = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name "
227227
+ "FROM oceanbase.CDB_OB_TABLEGROUP_TABLES "
228-
+ "WHERE tablegroup_name = ? and tenant_id = ? limit 1;";
228+
+ "WHERE tablegroup_name = ? and tenant_id = ? order by table_name limit 1;";
229229

230230
private static final int TEMPLATE_PART_ID = -1;
231231

@@ -2507,18 +2507,24 @@ public static void parseObVerionFromLogin(String serverVersion)
25072507
throws FeatureNotSupportedException {
25082508
Pattern pattern;
25092509
if (serverVersion.startsWith("OceanBase_CE")) {
2510-
// serverVersion in CE is like "OceanBase_CE 4.0.0.0"
2511-
pattern = Pattern.compile("OceanBase_CE\\s+(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)");
2510+
// serverVersion in CE is like "OceanBase_CE 4.0.0.0 (+ Obproxy 4.3.6.0), content in () is optional and valid after Obproxy 4.3.5"
2511+
pattern = Pattern.compile("OceanBase_CE\\s+(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)(\\s+\\+\\s+(Obproxy)\\s+(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+))?");
25122512
} else {
2513-
// serverVersion is like "OceanBase 4.0.0.0"
2514-
pattern = Pattern.compile("OceanBase\\s+(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)");
2513+
// serverVersion is like "OceanBase 4.0.0.0 (+ Obproxy 4.3.6.0), content in () is optional and valid after Obproxy 4.3.5"
2514+
pattern = Pattern.compile("OceanBase\\s+(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)(\\s+\\+\\s+(Obproxy)\\s+(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+))?");
25152515
}
25162516
Matcher matcher = pattern.matcher(serverVersion);
25172517
if (matcher.find() && ObGlobal.OB_VERSION == 0) {
25182518
ObGlobal.OB_VERSION = ObGlobal.calcVersion(Integer.parseInt(matcher.group(1)),
25192519
(short) Integer.parseInt(matcher.group(2)),
25202520
(byte) Integer.parseInt(matcher.group(3)),
25212521
(byte) Integer.parseInt(matcher.group(4)));
2522+
if (matcher.group(5) != null && matcher.group(6) != null) { // Obproxy part
2523+
ObGlobal.OB_PROXY_VERSION = ObGlobal.calcVersion(Integer.parseInt(matcher.group(7)),
2524+
(short) Integer.parseInt(matcher.group(8)),
2525+
(byte) Integer.parseInt(matcher.group(9)),
2526+
(byte) Integer.parseInt(matcher.group(10)));
2527+
}
25222528
if (ObGlobal.obVsnMajor() < 4) {
25232529
throw new FeatureNotSupportedException(
25242530
"The current client version supports only server version greater than or equal to 4.0.0.0");

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ public interface Pcodes {
3535
int OB_TABLE_API_MOVE = 0x1124;
3636
int OB_TABLE_API_LS_EXECUTE = 0x1125;
3737
int OB_TABLE_API_META_INFO_EXECUTE = 0x1128;
38+
int OB_TABLE_API_HBASE_EXECUTE = 0x1129;
3839
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*-
2+
* #%L
3+
* OBKV Table Client Framework
4+
* %%
5+
* Copyright (C) 2021 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
19+
20+
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
21+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
22+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil;
23+
import com.alipay.oceanbase.rpc.util.ObByteBuf;
24+
import com.alipay.oceanbase.rpc.util.Serialization;
25+
26+
public class ObHbaseCell extends AbstractPayload {
27+
// if there are more elements in the future
28+
// add new constructor to create new array
29+
// cannot List because its size cannot be initialized and is dynamic
30+
private ObObj[] propertiesValue = null; // Q T V (TTL)
31+
32+
public ObHbaseCell(boolean isCellTTL) {
33+
propertiesValue = isCellTTL ? new ObObj[4] : new ObObj[3];
34+
}
35+
36+
public void encode(ObByteBuf buf) {
37+
// 0. encode header
38+
encodeHeader(buf);
39+
40+
// 1. encode properties value
41+
Serialization.encodeVi64(buf, propertiesValue.length);
42+
for (ObObj obj : propertiesValue) {
43+
ObTableSerialUtil.encode(buf, obj);
44+
}
45+
}
46+
47+
@Override
48+
public byte[] encode() {
49+
byte[] bytes = new byte[(int) getPayloadSize()];
50+
int idx = 0;
51+
52+
// 0. encode header
53+
idx = encodeHeader(bytes, idx);
54+
55+
// 1. encode properties value
56+
int len = Serialization.getNeedBytes(propertiesValue.length);
57+
System.arraycopy(Serialization.encodeVi64(propertiesValue.length), 0, bytes, idx, len);
58+
idx += len;
59+
for (ObObj obj : propertiesValue) {
60+
len = ObTableSerialUtil.getEncodedSize(obj);
61+
System.arraycopy(ObTableSerialUtil.encode(obj), 0, bytes, idx, len);
62+
idx += len;
63+
}
64+
return bytes;
65+
}
66+
67+
@Override
68+
public long getPayloadContentSize() {
69+
if (this.payLoadContentSize == INVALID_PAYLOAD_CONTENT_SIZE) {
70+
long payloadContentSize = 0;
71+
// add properties value array size and value
72+
payloadContentSize += Serialization.getNeedBytes(propertiesValue.length);
73+
for (ObObj obj : propertiesValue) {
74+
payloadContentSize += ObTableSerialUtil.getEncodedSize(obj);
75+
}
76+
this.payLoadContentSize = payloadContentSize;
77+
}
78+
return this.payLoadContentSize;
79+
}
80+
81+
public ObObj getQ() {
82+
return propertiesValue[0];
83+
}
84+
85+
public ObObj getT() {
86+
return propertiesValue[1];
87+
}
88+
89+
public ObObj getV() {
90+
return propertiesValue[2];
91+
}
92+
93+
public ObObj getTTL() {
94+
if (propertiesValue.length == 3) {
95+
throw new IllegalArgumentException("table schema has no cell TTL");
96+
}
97+
return propertiesValue[3];
98+
}
99+
100+
public void setQ(ObObj Q) {
101+
propertiesValue[0] = Q;
102+
}
103+
104+
public void setT(ObObj T) {
105+
propertiesValue[1] = T;
106+
}
107+
108+
public void setV(ObObj V) {
109+
propertiesValue[2] = V;
110+
}
111+
112+
public void setTTL(ObObj TTL) {
113+
if (propertiesValue.length == 3) {
114+
throw new IllegalArgumentException("table schema has no cell TTL");
115+
}
116+
propertiesValue[3] = TTL;
117+
}
118+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
19+
20+
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
21+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
22+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil;
23+
import com.alipay.oceanbase.rpc.util.Serialization;
24+
import io.netty.buffer.ByteBuf;
25+
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
29+
public class ObHbaseCellResult extends AbstractPayload {
30+
private long keyIndex; // mapping to the right HBase operation
31+
private List<ObObj> propertiesValues = new ArrayList<>(); // HBase Get result
32+
33+
public ObHbaseCellResult() {
34+
keyIndex = -1;
35+
}
36+
37+
public ObHbaseCellResult(long keyIndex) {
38+
this.keyIndex = keyIndex;
39+
}
40+
41+
public long getKeyIndex() {
42+
return keyIndex;
43+
}
44+
45+
public List<ObObj> getPropertiesValues() {
46+
return propertiesValues;
47+
}
48+
49+
@Override
50+
public byte[] encode() {
51+
return new byte[0];
52+
}
53+
54+
@Override
55+
public Object decode(ByteBuf buf) {
56+
// 0. decode version
57+
super.decode(buf);
58+
59+
// 1. decode key index
60+
keyIndex = Serialization.decodeVi64(buf);
61+
62+
// 2. decode properties values
63+
int len = (int) Serialization.decodeVi64(buf);
64+
for (int i = 0; i < len; ++i) {
65+
ObObj obj = new ObObj();
66+
ObTableSerialUtil.decode(buf, obj);
67+
this.propertiesValues.add(obj);
68+
}
69+
return this;
70+
}
71+
72+
@Override
73+
public long getPayloadContentSize() {
74+
return 0;
75+
}
76+
}

0 commit comments

Comments
 (0)