Skip to content

Commit eb0d8b5

Browse files
authored
Hbase metrics management (#404)
* add OHOperationType enum * set hbaseOpType to requests and encode to server * set hbase op type to query * fix retry for meeting ObTableNotExists when query tableGroup
1 parent ca63d43 commit eb0d8b5

13 files changed

+188
-12
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,13 +2271,15 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
22712271
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
22722272
((ObTableQueryRequest) request).getTableQuery(), this);
22732273
tableQuery.setEntityType(request.getEntityType());
2274+
tableQuery.setHbaseOpType(request.getHbaseOpType());
22742275
return new ObClusterTableQuery(tableQuery).executeInternal();
22752276
} else if (request instanceof ObTableQueryAsyncRequest) {
22762277
// TableGroup -> TableName
22772278
String tableName = request.getTableName();
22782279
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
22792280
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
22802281
tableQuery.setEntityType(request.getEntityType());
2282+
tableQuery.setHbaseOpType(request.getHbaseOpType());
22812283
ObClusterTableQuery clusterTableQuery = new ObClusterTableQuery(tableQuery);
22822284
clusterTableQuery.setAllowDistributeScan(((ObTableQueryAsyncRequest) request).isAllowDistributeScan());
22832285
return clusterTableQuery.asyncExecuteInternal();
@@ -2387,6 +2389,15 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23872389
} else {
23882390
if (ex instanceof ObTableException &&
23892391
(((ObTableException) ex).isNeedRefreshTableEntry() || ((ObTableException) ex).isNeedRetryError())) {
2392+
if (ex instanceof ObTableNotExistException) {
2393+
String logMessage = String.format(
2394+
"exhaust retry while meet TableNotExist Exception, table name: %s, errorCode: %d",
2395+
request.getTableName(),
2396+
((ObTableException) ex).getErrorCode()
2397+
);
2398+
logger.warn(logMessage, ex);
2399+
throw ex;
2400+
}
23902401
logger.warn(
23912402
"tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}",
23922403
request.getTableName(), routeTabletId, ((ObTableException) ex).getErrorCode(), ex);

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alipay.oceanbase.rpc.get.Get;
2525
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
27+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.OHOperationType;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2829
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
2930
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate;
@@ -53,6 +54,7 @@ public class BatchOperation {
5354
ObTableOperationType lastType = ObTableOperationType.INVALID;
5455
boolean isSameType = true;
5556
protected ObTableEntityType entityType = ObTableEntityType.KV;
57+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
5658

5759
/*
5860
* default constructor
@@ -90,6 +92,10 @@ public BatchOperation setTable(String tableName) {
9092
return this;
9193
}
9294

95+
public void setHbaseOpType(OHOperationType hbaseOpType) {
96+
this.hbaseOpType = hbaseOpType;
97+
}
98+
9399
/*
94100
* add queries
95101
*/
@@ -325,6 +331,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
325331
batchOps.setEntityType(entityType);
326332
batchOps.setServerCanRetry(serverCanRetry);
327333
batchOps.setNeedTabletId(needTabletId);
334+
batchOps.setHbaseOpType(hbaseOpType);
328335
for (Object operation : operations) {
329336
if (operation instanceof CheckAndInsUp) {
330337
checkAndInsUpCnt++;
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
19+
20+
import java.util.*;
21+
22+
public enum OHOperationType {
23+
INVALID(0),
24+
PUT(1),
25+
PUT_LIST(2),
26+
DELETE(3),
27+
DELETE_LIST(4),
28+
GET(5),
29+
GET_LIST(6),
30+
EXISTS(7),
31+
EXISTS_LIST(8),
32+
BATCH(9),
33+
BATCH_CALLBACK(10),
34+
SCAN(11),
35+
CHECK_AND_PUT(12),
36+
CHECK_AND_DELETE(13),
37+
CHECK_AND_MUTATE(14),
38+
APPEND(15),
39+
INCREMENT(16),
40+
INCREMENT_COLUMN_VALUE(17),
41+
MUTATE_ROW(18);
42+
43+
private final int value;
44+
private static final Map<Integer, OHOperationType> map = new HashMap<Integer, OHOperationType>();
45+
46+
static {
47+
for (OHOperationType type : OHOperationType.values()) {
48+
map.put(type.value, type);
49+
}
50+
}
51+
52+
OHOperationType(int value) {
53+
this.value = value;
54+
}
55+
56+
public static OHOperationType valueOf(int value) {
57+
return map.get(value);
58+
}
59+
60+
public int getValue() {
61+
return value;
62+
}
63+
64+
public byte getByteValue() {
65+
return (byte) value;
66+
}
67+
68+
/*
69+
* CHECK_AND_PUT -> checkAndPut
70+
* PUT -> put
71+
*/
72+
public String toCamelCase() {
73+
String name = this.name();
74+
if (name == null || name.isEmpty()) {
75+
return name;
76+
}
77+
78+
String[] parts = name.split("_");
79+
StringBuilder sb = new StringBuilder();
80+
81+
for (int i = 0; i < parts.length; i++) {
82+
String part = parts[i];
83+
if (part == null || part.isEmpty()) {
84+
continue;
85+
}
86+
87+
if (i == 0) {
88+
sb.append(part.toLowerCase());
89+
} else {
90+
if (!part.isEmpty()) {
91+
sb.append(Character.toUpperCase(part.charAt(0)));
92+
if (part.length() > 1) {
93+
sb.append(part.substring(1).toLowerCase());
94+
}
95+
}
96+
}
97+
}
98+
return sb.toString();
99+
}
100+
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
option_flag_,
3939
op_type_,
4040
keys_,
41-
cf_rows_);
41+
cf_rows_,
42+
hbase_op_type_);
4243
*/
4344
/*
4445
[k1][k2][k3]...
@@ -53,6 +54,7 @@ public class ObHbaseRequest extends AbstractPayload implements Credentialable {
5354
protected ObTableOperationType opType;
5455
protected List<ObObj> keys = new ArrayList<>();
5556
protected List<ObHbaseCfRows> cfRows;
57+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
5658

5759
public ObHbaseRequest() {
5860
this.credential = new ObBytesString();
@@ -112,6 +114,9 @@ public byte[] encode() {
112114
ObHbaseCfRows sameCfRows = cfRows.get(i);
113115
sameCfRows.encode(buf);
114116
}
117+
118+
// 7. encode hbase op type, to differentiate put and put list
119+
Serialization.encodeI8(buf, hbaseOpType.getByteValue());
115120

116121
if (buf.pos != buf.bytes.length) {
117122
throw new IllegalArgumentException("error in encode ObHbaseRequest (" +
@@ -151,6 +156,7 @@ public long getPayloadContentSize() {
151156
for (ObHbaseCfRows cfRows : cfRows) {
152157
payLoadContentSize += cfRows.getPayloadSize();
153158
}
159+
payLoadContentSize += 1; // hbase_op_type_
154160
}
155161
return payLoadContentSize;
156162
}
@@ -184,6 +190,10 @@ public void setServerCanRetry(boolean canRetry) {
184190
optionFlag.setFlagServerCanRetry(canRetry);
185191
}
186192

193+
public void setHbaseOpType(OHOperationType hbaseOpType) {
194+
this.hbaseOpType = hbaseOpType;
195+
}
196+
187197
public boolean getServerCanRetry() {
188198
return optionFlag.getFlagServerCanRetry();
189199
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public abstract class ObTableAbstractOperationRequest extends AbstractPayload im
3939
protected ObTableOptionFlag option_flag = ObTableOptionFlag.DEFAULT;
4040
protected boolean returningAffectedEntity = false;
4141
protected boolean returningAffectedRows = false;
42+
protected OHOperationType hbaseOpType = OHOperationType.INVALID; // for table operations, this will be INVALID(0)
4243

4344
/*
4445
* Get payload content size.
@@ -220,6 +221,14 @@ public void setNeedTabletId(boolean needTabletId) {
220221
option_flag.setNeedTabletId(needTabletId);
221222
}
222223

224+
public void setHbaseOpType(OHOperationType hbaseOpType) {
225+
this.hbaseOpType = hbaseOpType;
226+
}
227+
228+
public OHOperationType getHbaseOpType() {
229+
return hbaseOpType;
230+
}
231+
223232
public boolean getNeedTabletId() {
224233
return option_flag.isNeedTabletId();
225234
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
credential_,
3434
entity_type_,
3535
consistency_level_,
36-
ls_op_);
36+
ls_op_,
37+
hbase_op_type_);
3738
*/
3839
public class ObTableLSOpRequest extends AbstractPayload implements Credentialable {
3940
protected ObBytesString credential;
4041
protected ObTableEntityType entityType = ObTableEntityType.KV;
4142
protected ObTableConsistencyLevel consistencyLevel = ObTableConsistencyLevel.STRONG;
4243
private ObTableLSOperation lsOperation = null;
44+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
4345

4446
/*
4547
* Get pcode.
@@ -70,6 +72,9 @@ public byte[] encode() {
7072

7173
// 4. encode lsOperation
7274
lsOperation.encode(buf);
75+
76+
// 5. encode hbase op type, for table operations, this will be INVALID(0)
77+
Serialization.encodeI8(buf, hbaseOpType.getByteValue());
7378
if (buf.pos != buf.bytes.length) {
7479
throw new IllegalArgumentException("error in encode lsOperationRequest (" +
7580
"pos:" + buf.pos + ", buf.capacity:" + buf.bytes.length + ")");
@@ -99,7 +104,7 @@ public Object decode(ByteBuf buf) {
99104
public long getPayloadContentSize() {
100105
if (payLoadContentSize == INVALID_PAYLOAD_CONTENT_SIZE) {
101106
payLoadContentSize = lsOperation.getPayloadSize() + Serialization.getNeedBytes(credential) + 1 // entityType
102-
+ 1; // consistencyLevel
107+
+ 1 /* consistencyLevel */ + 1 /* hbaseOpType */;
103108
}
104109
return payLoadContentSize;
105110
}
@@ -161,6 +166,10 @@ public void setTableId(long tableId) {
161166
this.lsOperation.setTableId(tableId);
162167
}
163168

169+
public void setHbaseOpType(OHOperationType hbaseOpType) {
170+
this.hbaseOpType = hbaseOpType;
171+
}
172+
164173
/**
165174
* Reset the cached payload content size and propagate to child objects
166175
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutateRequest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
entity_type_,
3636
query_and_mutate_,
3737
binlog_row_image_type_,
38-
option_flag_);
38+
option_flag_,
39+
hbase_op_type_);
3940
*
4041
*/
4142
public class ObTableQueryAndMutateRequest extends ObTableAbstractOperationRequest {
@@ -78,6 +79,9 @@ public byte[] encode() {
7879
idx += len;
7980
System.arraycopy(Serialization.encodeI8(option_flag.getByteValue()), 0, bytes, idx, 1);
8081

82+
idx += 1;
83+
System.arraycopy(Serialization.encodeI8(hbaseOpType.getByteValue()), 0, bytes, idx, 1);
84+
8185
return bytes;
8286
}
8387

@@ -111,7 +115,7 @@ public long getPayloadContentSize() {
111115
if (ObGlobal.obVsnMajor() >= 4)
112116
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
113117
+ Serialization.getNeedBytes(tableId) + 8 + 1
114-
+ tableQueryAndMutate.getPayloadSize() + Serialization.getNeedBytes(type.getValue()) + 1;
118+
+ tableQueryAndMutate.getPayloadSize() + Serialization.getNeedBytes(type.getValue()) + 1 + 1;
115119
else
116120
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
117121
+ Serialization.getNeedBytes(tableId) + Serialization.getNeedBytes(partitionId)

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,8 @@
3030
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
3131
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
3232
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
33-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
33+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
3434
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
35-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
36-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest;
37-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult;
3835
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3936
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
4037
import com.alipay.oceanbase.rpc.table.ObTable;
@@ -65,6 +62,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
6562
// global index: key is index table name (be like: __idx_<data_table_id>_<index_name>)
6663
protected String indexTableName;
6764
protected ObTableEntityType entityType;
65+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
6866
protected Map<Long, ObPair<Long, ObTableParam>> expectant;
6967
protected List<String> cacheProperties = new LinkedList<String>();
7068
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
@@ -832,4 +830,12 @@ public ObTableClient getClient() {
832830
public void setClient(ObTableClient client) {
833831
this.client = client;
834832
}
833+
834+
public OHOperationType getHbaseOpType() {
835+
return hbaseOpType;
836+
}
837+
838+
public void setHbaseOpType(OHOperationType hbaseOpType) {
839+
this.hbaseOpType = hbaseOpType;
840+
}
835841
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQueryRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
partition_id_,
3535
entity_type_,
3636
consistency_level_,
37-
query_
37+
query_,
38+
option_flag_,
39+
hbase_op_type_
3840
);
3941
*
4042
*/
@@ -76,6 +78,9 @@ public byte[] encode() {
7678
idx += len;
7779
System.arraycopy(Serialization.encodeVi64(option_flag.getValue()), 0, bytes, idx, 1);
7880

81+
idx += 1;
82+
System.arraycopy(Serialization.encodeI8(hbaseOpType.getByteValue()), 0, bytes, idx, 1);
83+
7984
return bytes;
8085
}
8186

@@ -109,7 +114,7 @@ public Object decode(ByteBuf buf) {
109114
public long getPayloadContentSize() {
110115
if (ObGlobal.obVsnMajor() >= 4)
111116
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
112-
+ Serialization.getNeedBytes(tableId) + 8 + 2 + tableQuery.getPayloadSize() + 1;
117+
+ Serialization.getNeedBytes(tableId) + 8 + 2 + tableQuery.getPayloadSize() + 1 + 1;
113118
else
114119
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
115120
+ Serialization.getNeedBytes(tableId) + Serialization.getNeedBytes(partitionId)

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void init() throws Exception {
5858
request.setTableQuery(tableQuery);
5959
request.setEntityType(entityType);
6060
request.setConsistencyLevel(getReadConsistency().toObTableConsistencyLevel());
61+
request.setHbaseOpType(hbaseOpType);
6162

6263
// construct async query request
6364
asyncRequest.setObTableQueryRequest(request);

0 commit comments

Comments
 (0)