Skip to content

Commit a644aba

Browse files
WeiXinChanJackShi148
authored andcommitted
new hbase protocal (#363)
1 parent 398e55a commit a644aba

File tree

6 files changed

+347
-1
lines changed

6 files changed

+347
-1
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2386,6 +2386,24 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23862386
+ "is not supported. make sure the correct version");
23872387
}
23882388

2389+
public ObPayload execute(final ObHbaseRequest request) throws Exception {
2390+
if (request.getTableName() == null || request.getTableName().isEmpty()) {
2391+
throw new IllegalArgumentException("table name is null");
2392+
}
2393+
if (isOdpMode()) {
2394+
return getOdpTable().execute(request);
2395+
} else {
2396+
Row row = new Row();
2397+
row.add("K", request.getKeys().get(0).getValue());
2398+
row.add("Q", request.getCells().get(0).getQ().getValue());
2399+
row.add("T", request.getCells().get(0).getT().getValue());
2400+
row.add("V", request.getCells().get(0).getV().getValue());
2401+
ObTableParam tableParam = tableRoute.getTableParam(request.getTableName(), row);
2402+
ObTable obTable = tableParam.getObTable();
2403+
return executeWithRetry(obTable, request, request.getTableName());
2404+
}
2405+
}
2406+
23892407
private ObTableQueryAndMutate buildObTableQueryAndMutate(ObTableQuery obTableQuery,
23902408
ObTableBatchOperation obTableBatchOperation) {
23912409
ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
2727
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
2828
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObFetchPartitionMetaResult;
29+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObHbaseResult;
2930
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
3031
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
3132
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
@@ -35,6 +36,7 @@
3536
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
3637
import com.alipay.remoting.CommandCode;
3738

39+
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_HBASE_EXECUTE;
3840
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_META_INFO_EXECUTE;
3941

4042
public enum ObTablePacketCode implements CommandCode {
@@ -143,6 +145,12 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
143145
public ObPayload newPayload(ObRpcPacketHeader header) {
144146
return new ObTableMetaResponse();
145147
}
148+
},
149+
OB_TABLE_API_HBASE_EXECUTE(Pcodes.OB_TABLE_API_HBASE_EXECUTE) {
150+
@Override
151+
public ObPayload newPayload(ObRpcPacketHeader header) {
152+
return new ObHbaseResult();
153+
}
146154
};
147155

148156
private short value;
@@ -185,8 +193,10 @@ public static ObTablePacketCode valueOf(short value) {
185193
return OB_TABLE_API_MOVE;
186194
case Pcodes.OB_ERROR_PACKET:
187195
return OB_ERROR_PACKET;
188-
case OB_TABLE_API_META_INFO_EXECUTE:
196+
case Pcodes.OB_TABLE_API_META_INFO_EXECUTE:
189197
return OB_TABLE_META_INFO_EXECUTE;
198+
case Pcodes.OB_TABLE_API_HBASE_EXECUTE:
199+
return OB_TABLE_API_HBASE_EXECUTE;
190200
}
191201
throw new IllegalArgumentException("Unknown Rpc command code value ," + value);
192202
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ public interface Pcodes {
3535
int OB_TABLE_API_MOVE = 0x1124;
3636
int OB_TABLE_API_LS_EXECUTE = 0x1125;
3737
int OB_TABLE_API_META_INFO_EXECUTE = 0x1128;
38+
int OB_TABLE_API_HBASE_EXECUTE = 0x1129;
3839
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*-
2+
* #%L
3+
* OBKV Table Client Framework
4+
* %%
5+
* Copyright (C) 2021 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
19+
20+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
21+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil;
22+
import com.alipay.oceanbase.rpc.util.ObByteBuf;
23+
24+
public class ObHbaseQTV {
25+
private ObObj Q;
26+
private ObObj T;
27+
private ObObj V;
28+
29+
public ObHbaseQTV() {
30+
this.Q = new ObObj();
31+
this.T = new ObObj();
32+
this.V = new ObObj();
33+
}
34+
35+
public ObHbaseQTV(ObObj Q, ObObj T, ObObj V) {
36+
this.Q = Q;
37+
this.T = T;
38+
this.V = V;
39+
}
40+
41+
public void encode(ObByteBuf buf) {
42+
ObTableSerialUtil.encode(buf, Q);
43+
ObTableSerialUtil.encode(buf, T);
44+
ObTableSerialUtil.encode(buf, V);
45+
}
46+
47+
public long getPayloadSize() {
48+
return ObTableSerialUtil.getEncodedSize(Q) + ObTableSerialUtil.getEncodedSize(T)
49+
+ ObTableSerialUtil.getEncodedSize(V);
50+
}
51+
52+
public ObObj getQ() {
53+
return Q;
54+
}
55+
56+
public ObObj getT() {
57+
return T;
58+
}
59+
60+
public ObObj getV() {
61+
return V;
62+
}
63+
64+
public void setQ(ObObj Q) {
65+
this.Q = Q;
66+
}
67+
68+
public void setT(ObObj T) {
69+
this.T = T;
70+
}
71+
72+
public void setV(ObObj V) {
73+
this.V = V;
74+
}
75+
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
19+
20+
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
21+
import com.alipay.oceanbase.rpc.protocol.payload.Credentialable;
22+
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
24+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil;
25+
import com.alipay.oceanbase.rpc.util.ObByteBuf;
26+
import com.alipay.oceanbase.rpc.util.ObBytesString;
27+
import com.alipay.oceanbase.rpc.util.Serialization;
28+
import io.netty.buffer.ByteBuf;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/*
34+
OB_SERIALIZE_MEMBER(ObHbaseRequest,
35+
credential_,
36+
table_name_,
37+
rows_);
38+
*/
39+
/*
40+
[k1][k2][k3]...
41+
[1] [3] [2] ...
42+
[QTV] [QTV][QTV][QTV] [QTV][QTV]
43+
*/
44+
public class ObHbaseRequest extends AbstractPayload implements Credentialable {
45+
protected ObBytesString credential;
46+
protected String tableName;
47+
protected List<ObObj> keys = new ArrayList<>();
48+
protected List<Integer> cellNumArray = new ArrayList<>();
49+
protected List<ObHbaseQTV> cells = new ArrayList<>();
50+
51+
public ObHbaseRequest() {
52+
this.credential = new ObBytesString();
53+
this.tableName = "";
54+
this.keys = new ArrayList<>();
55+
this.cellNumArray = new ArrayList<>();
56+
this.cells = new ArrayList<>();
57+
}
58+
59+
public ObHbaseRequest(ObBytesString credential, String tableName, List<ObObj> keys, List<Integer> cellNumArray, List<ObHbaseQTV> cells) {
60+
this.credential = credential;
61+
this.tableName = tableName;
62+
this.keys = keys;
63+
this.cellNumArray = cellNumArray;
64+
this.cells = cells;
65+
}
66+
67+
/*
68+
* Get pcode.
69+
*/
70+
@Override
71+
public int getPcode() {
72+
return Pcodes.OB_TABLE_API_HBASE_EXECUTE;
73+
}
74+
75+
/*
76+
* Encode.
77+
*/
78+
@Override
79+
public byte[] encode() {
80+
long calculatedSize = getPayloadSize();
81+
ObByteBuf buf = new ObByteBuf((int) calculatedSize);
82+
83+
// 0. encode ObHbaseRequest header
84+
encodeHeader(buf);
85+
86+
// 1. encode credential
87+
Serialization.encodeBytesString(buf, credential);
88+
89+
// 2. encode tableName
90+
Serialization.encodeVString(buf, tableName);
91+
92+
// 3. encode keys array size and keys
93+
Serialization.encodeVi64(buf, keys.size());
94+
for (int i = 0; i < keys.size(); i++) {
95+
ObObj key = keys.get(i);
96+
ObTableSerialUtil.encode(buf, key);
97+
}
98+
99+
// 4. encode cellNumArray size and elements
100+
Serialization.encodeVi64(buf, cellNumArray.size());
101+
for (int i = 0; i < cellNumArray.size(); i++) {
102+
int cellNum = cellNumArray.get(i);
103+
Serialization.encodeVi64(buf, cellNum);
104+
}
105+
106+
// 5. encode cells array size and cells
107+
Serialization.encodeVi64(buf, cells.size());
108+
for (int i = 0; i < cells.size(); i++) {
109+
ObHbaseQTV cell = cells.get(i);
110+
cell.encode(buf);
111+
}
112+
113+
if (buf.pos != buf.bytes.length) {
114+
throw new IllegalArgumentException("error in encode ObHbaseRequest (" +
115+
"pos:" + buf.pos + ", buf.capacity:" + buf.bytes.length + ", calculatedSize:" + calculatedSize + ")");
116+
}
117+
return buf.bytes;
118+
}
119+
120+
/*
121+
* Decode.
122+
*/
123+
@Override
124+
public Object decode(ByteBuf buf) {
125+
super.decode(buf);
126+
return this;
127+
}
128+
129+
/*
130+
* Get payload content size.
131+
*/
132+
@Override
133+
public long getPayloadContentSize() {
134+
if (payLoadContentSize == INVALID_PAYLOAD_CONTENT_SIZE) {
135+
payLoadContentSize = Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName);
136+
137+
// Size for keys array
138+
payLoadContentSize += Serialization.getNeedBytes(keys.size());
139+
for (ObObj key : keys) {
140+
payLoadContentSize += ObTableSerialUtil.getEncodedSize(key);
141+
}
142+
143+
// Size for cellNumArray
144+
payLoadContentSize += Serialization.getNeedBytes(cellNumArray.size());
145+
for (int cellNum : cellNumArray) {
146+
payLoadContentSize += Serialization.getNeedBytes(cellNum);
147+
}
148+
149+
// Size for cells array
150+
payLoadContentSize += Serialization.getNeedBytes(cells.size());
151+
for (ObHbaseQTV cell : cells) {
152+
payLoadContentSize += cell.getPayloadSize();
153+
}
154+
}
155+
return payLoadContentSize;
156+
}
157+
158+
@Override
159+
public void setCredential(ObBytesString credential) {
160+
this.credential = credential;
161+
}
162+
163+
public void setTableName(String tableName) {
164+
this.tableName = tableName;
165+
}
166+
167+
public void setKeys(List<ObObj> keys) {
168+
this.keys = keys;
169+
}
170+
171+
public void setCellNumArray(List<Integer> cellNumArray) {
172+
this.cellNumArray = cellNumArray;
173+
}
174+
175+
public void setCells(List<ObHbaseQTV> cells) {
176+
this.cells = cells;
177+
}
178+
179+
public ObBytesString getCredential() {
180+
return credential;
181+
}
182+
183+
public String getTableName() {
184+
return tableName;
185+
}
186+
187+
public List<ObObj> getKeys() {
188+
return keys;
189+
}
190+
191+
public List<ObHbaseQTV> getCells() {
192+
return cells;
193+
}
194+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
19+
20+
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
21+
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
22+
23+
public class ObHbaseResult extends AbstractPayload {
24+
25+
/*
26+
* Get pcode.
27+
*/
28+
@Override
29+
public int getPcode() {
30+
return Pcodes.OB_TABLE_API_HBASE_EXECUTE;
31+
}
32+
33+
@Override
34+
public byte[] encode() {
35+
byte[] bytes = new byte[(int) getPayloadSize()];
36+
int idx = 0;
37+
38+
// 0. encode header
39+
idx = encodeHeader(bytes, idx);
40+
41+
return bytes;
42+
}
43+
44+
@Override
45+
public long getPayloadContentSize() {
46+
return 0;
47+
}
48+
}

0 commit comments

Comments
 (0)