Skip to content

Commit 4939065

Browse files
committed
set hbaseOpType to requests and encode to server
1 parent 76271a5 commit 4939065

File tree

8 files changed

+67
-25
lines changed

8 files changed

+67
-25
lines changed

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alipay.oceanbase.rpc.get.Get;
2525
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
27+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.OHOperationType;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2829
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
2930
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate;
@@ -53,6 +54,7 @@ public class BatchOperation {
5354
ObTableOperationType lastType = ObTableOperationType.INVALID;
5455
boolean isSameType = true;
5556
protected ObTableEntityType entityType = ObTableEntityType.KV;
57+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
5658

5759
/*
5860
* default constructor
@@ -90,6 +92,10 @@ public BatchOperation setTable(String tableName) {
9092
return this;
9193
}
9294

95+
public void setHbaseOpType(OHOperationType hbaseOpType) {
96+
this.hbaseOpType = hbaseOpType;
97+
}
98+
9399
/*
94100
* add queries
95101
*/
@@ -325,6 +331,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
325331
batchOps.setEntityType(entityType);
326332
batchOps.setServerCanRetry(serverCanRetry);
327333
batchOps.setNeedTabletId(needTabletId);
334+
batchOps.setHbaseOpType(hbaseOpType);
328335
for (Object operation : operations) {
329336
if (operation instanceof CheckAndInsUp) {
330337
checkAndInsUpCnt++;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/OHOperationType.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,25 @@
33
import java.util.*;
44

55
public enum OHOperationType {
6-
PUT(0),
7-
PUT_LIST(1),
8-
DELETE(2),
9-
DELETE_LIST(3),
10-
GET(4),
11-
GET_LIST(5),
12-
EXISTS(6),
13-
EXISTS_LIST(7),
14-
BATCH(8),
15-
BATCH_CALLBACK(9),
16-
SCAN(10),
17-
CHECK_AND_PUT(11),
18-
CHECK_AND_DELETE(12),
19-
CHECK_AND_MUTATE(13),
20-
APPEND(14),
21-
INCREMENT(15),
22-
INCREMENT_COLUMN_VALUE(16),
23-
MUTATE_ROW(17);
6+
INVALID(0),
7+
PUT(1),
8+
PUT_LIST(2),
9+
DELETE(3),
10+
DELETE_LIST(4),
11+
GET(5),
12+
GET_LIST(6),
13+
EXISTS(7),
14+
EXISTS_LIST(8),
15+
BATCH(9),
16+
BATCH_CALLBACK(10),
17+
SCAN(11),
18+
CHECK_AND_PUT(12),
19+
CHECK_AND_DELETE(13),
20+
CHECK_AND_MUTATE(14),
21+
APPEND(15),
22+
INCREMENT(16),
23+
INCREMENT_COLUMN_VALUE(17),
24+
MUTATE_ROW(18);
2425

2526
private final int value;
2627
private static final Map<Integer, OHOperationType> map = new HashMap<Integer, OHOperationType>();

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class ObHbaseRequest extends AbstractPayload implements Credentialable {
5353
protected ObTableOperationType opType;
5454
protected List<ObObj> keys = new ArrayList<>();
5555
protected List<ObHbaseCfRows> cfRows;
56+
protected OHOperationType hbaseOpType;
5657

5758
public ObHbaseRequest() {
5859
this.credential = new ObBytesString();
@@ -184,6 +185,10 @@ public void setServerCanRetry(boolean canRetry) {
184185
optionFlag.setFlagServerCanRetry(canRetry);
185186
}
186187

188+
public void setHbaseOpType(OHOperationType hbaseOpType) {
189+
this.hbaseOpType = hbaseOpType;
190+
}
191+
187192
public boolean getServerCanRetry() {
188193
return optionFlag.getFlagServerCanRetry();
189194
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public abstract class ObTableAbstractOperationRequest extends AbstractPayload im
3939
protected ObTableOptionFlag option_flag = ObTableOptionFlag.DEFAULT;
4040
protected boolean returningAffectedEntity = false;
4141
protected boolean returningAffectedRows = false;
42+
protected OHOperationType hbaseOpType = OHOperationType.INVALID; // for table operations, this will be INVALID(0)
4243

4344
/*
4445
* Get payload content size.
@@ -220,6 +221,10 @@ public void setNeedTabletId(boolean needTabletId) {
220221
option_flag.setNeedTabletId(needTabletId);
221222
}
222223

224+
public void setHbaseOpType(OHOperationType hbaseOpType) {
225+
this.hbaseOpType = hbaseOpType;
226+
}
227+
223228
public boolean getNeedTabletId() {
224229
return option_flag.isNeedTabletId();
225230
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
credential_,
3434
entity_type_,
3535
consistency_level_,
36-
ls_op_);
36+
ls_op_,
37+
hbase_op_type_);
3738
*/
3839
public class ObTableLSOpRequest extends AbstractPayload implements Credentialable {
3940
protected ObBytesString credential;
4041
protected ObTableEntityType entityType = ObTableEntityType.KV;
4142
protected ObTableConsistencyLevel consistencyLevel = ObTableConsistencyLevel.STRONG;
4243
private ObTableLSOperation lsOperation = null;
44+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
4345

4446
/*
4547
* Get pcode.
@@ -70,6 +72,9 @@ public byte[] encode() {
7072

7173
// 4. encode lsOperation
7274
lsOperation.encode(buf);
75+
76+
// 5. encode hbase op type, for table operations, this will be INVALID(0)
77+
Serialization.encodeI8(buf, hbaseOpType.getByteValue());
7378
if (buf.pos != buf.bytes.length) {
7479
throw new IllegalArgumentException("error in encode lsOperationRequest (" +
7580
"pos:" + buf.pos + ", buf.capacity:" + buf.bytes.length + ")");
@@ -99,7 +104,7 @@ public Object decode(ByteBuf buf) {
99104
public long getPayloadContentSize() {
100105
if (payLoadContentSize == INVALID_PAYLOAD_CONTENT_SIZE) {
101106
payLoadContentSize = lsOperation.getPayloadSize() + Serialization.getNeedBytes(credential) + 1 // entityType
102-
+ 1; // consistencyLevel
107+
+ 1 /* consistencyLevel */ + 1 /* hbaseOpType */;
103108
}
104109
return payLoadContentSize;
105110
}
@@ -161,6 +166,10 @@ public void setTableId(long tableId) {
161166
this.lsOperation.setTableId(tableId);
162167
}
163168

169+
public void setHbaseOpType(OHOperationType hbaseOpType) {
170+
this.hbaseOpType = hbaseOpType;
171+
}
172+
164173
/**
165174
* Reset the cached payload content size and propagate to child objects
166175
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutateRequest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
entity_type_,
3636
query_and_mutate_,
3737
binlog_row_image_type_,
38-
option_flag_);
38+
option_flag_,
39+
hbase_op_type_);
3940
*
4041
*/
4142
public class ObTableQueryAndMutateRequest extends ObTableAbstractOperationRequest {
@@ -78,6 +79,9 @@ public byte[] encode() {
7879
idx += len;
7980
System.arraycopy(Serialization.encodeI8(option_flag.getByteValue()), 0, bytes, idx, 1);
8081

82+
idx += 1;
83+
System.arraycopy(Serialization.encodeI8(hbaseOpType.getByteValue()), 0, bytes, idx, 1);
84+
8185
return bytes;
8286
}
8387

@@ -111,7 +115,7 @@ public long getPayloadContentSize() {
111115
if (ObGlobal.obVsnMajor() >= 4)
112116
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
113117
+ Serialization.getNeedBytes(tableId) + 8 + 1
114-
+ tableQueryAndMutate.getPayloadSize() + Serialization.getNeedBytes(type.getValue()) + 1;
118+
+ tableQueryAndMutate.getPayloadSize() + Serialization.getNeedBytes(type.getValue()) + 1 + 1;
115119
else
116120
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
117121
+ Serialization.getNeedBytes(tableId) + Serialization.getNeedBytes(partitionId)

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQueryRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
partition_id_,
3535
entity_type_,
3636
consistency_level_,
37-
query_
37+
query_,
38+
option_flag_,
39+
hbase_op_type_
3840
);
3941
*
4042
*/
@@ -76,6 +78,9 @@ public byte[] encode() {
7678
idx += len;
7779
System.arraycopy(Serialization.encodeVi64(option_flag.getValue()), 0, bytes, idx, 1);
7880

81+
idx += 1;
82+
System.arraycopy(Serialization.encodeI8(hbaseOpType.getByteValue()), 0, bytes, idx, 1);
83+
7984
return bytes;
8085
}
8186

@@ -109,7 +114,7 @@ public Object decode(ByteBuf buf) {
109114
public long getPayloadContentSize() {
110115
if (ObGlobal.obVsnMajor() >= 4)
111116
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
112-
+ Serialization.getNeedBytes(tableId) + 8 + 2 + tableQuery.getPayloadSize() + 1;
117+
+ Serialization.getNeedBytes(tableId) + 8 + 2 + tableQuery.getPayloadSize() + 1 + 1;
113118
else
114119
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
115120
+ Serialization.getNeedBytes(tableId) + Serialization.getNeedBytes(partitionId)

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public class ObTableClientLSBatchOpsImpl extends AbstractTableBatchOps {
6969
private boolean returningAffectedEntity = false;
7070
private boolean needAllProp = false;
7171
private boolean serverCanRetry = false;
72-
private boolean needTabletId = false;
72+
private boolean needTabletId = false;
73+
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
7374
private List<ObTableSingleOp> batchOperation;
7475

7576
/*
@@ -94,6 +95,10 @@ public List<ObTableSingleOp> getSingleOperations() {
9495
return batchOperation;
9596
}
9697

98+
public void setHbaseOpType(OHOperationType hbaseOpType) {
99+
this.hbaseOpType = hbaseOpType;
100+
}
101+
97102
/*
98103
* Get.
99104
*/
@@ -591,6 +596,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
591596
tableLsOpRequest.setTableId(tableId);
592597
tableLsOpRequest.setEntityType(entityType);
593598
tableLsOpRequest.setTimeout(operationTimeout);
599+
tableLsOpRequest.setHbaseOpType(hbaseOpType);
594600

595601
ObTableLSOpResult subLSOpResult;
596602
boolean needRefreshPartitionLocation = false;

0 commit comments

Comments
 (0)