Skip to content

Commit 1bf4c65

Browse files
committed
add server_can_retry flag to enable retry capacity for new client operations
1 parent f1401b5 commit 1bf4c65

File tree

12 files changed

+146
-16
lines changed

12 files changed

+146
-16
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2103,6 +2103,8 @@ public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tab
21032103
if (request instanceof ObTableAbstractOperationRequest) {
21042104
long tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
21052105
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
2106+
} else if (request instanceof ObHbaseRequest) {
2107+
tableRoute.refreshTabletLocationBatch(tableName);
21062108
}
21072109
}
21082110
return result;
@@ -2397,14 +2399,28 @@ public ObPayload execute(final ObHbaseRequest request) throws Exception {
23972399
} else {
23982400
Row row = new Row();
23992401
// get the first cell from the first cfRows to route
2400-
String realTableName = request.getCfRows().get(0).getRealTableName();
2402+
// use the first table in tablegroup to route
2403+
String realTableName = null;
2404+
try {
2405+
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2406+
} catch (ObTableNotExistException e) {
2407+
if (request.getCfRows().size() != 1) {
2408+
throw new ObTableUnexpectedException("multi-cf operations must create tablegroup and binding tables");
2409+
} else {
2410+
realTableName = request.getCfRows().get(0).getRealTableName();
2411+
}
2412+
}
2413+
if (realTableName == null) {
2414+
throw new ObTableUnexpectedException("realTableName is null");
2415+
}
24012416
int keyIdx = request.getCfRows().get(0).getKeyIndex(0);
24022417
row.add("K", request.getKeys().get(keyIdx).getValue());
24032418
row.add("Q", request.getCfRows().get(0).getCells().get(0).getQ().getValue());
24042419
row.add("T", request.getCfRows().get(0).getCells().get(0).getT().getValue());
24052420
ObTableParam tableParam = tableRoute.getTableParam(realTableName, row);
24062421
ObTable obTable = tableParam.getObTable();
2407-
return executeWithRetry(obTable, request, request.getTableName());
2422+
request.setTimeout(obTable.getObTableOperationTimeout());
2423+
return executeWithRetry(obTable, request, realTableName);
24082424
}
24092425
}
24102426

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ public class LocationUtil {
6262
private static final ObjectMapper objectMapper = new ObjectMapper();
6363

6464
static {
65-
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
66-
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
65+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
66+
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
67+
.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
6768
loadJdbcDriver();
6869
}
6970

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class BatchOperation {
4848
boolean returnOneResult = false;
4949
boolean hasCheckAndInsUp = false;
5050
boolean hasGet = false;
51+
boolean serverCanRetry = false;
5152
ObTableOperationType lastType = ObTableOperationType.INVALID;
5253
boolean isSameType = true;
5354
protected ObTableEntityType entityType = ObTableEntityType.KV;
@@ -181,6 +182,10 @@ public void setEntityType(ObTableEntityType entityType) {
181182
this.entityType = entityType;
182183
}
183184

185+
public void setServerCanRetry(boolean canRetry) {
186+
this.serverCanRetry = canRetry;
187+
}
188+
184189
public BatchOperation setIsAtomic(boolean isAtomic) {
185190
this.isAtomic = isAtomic;
186191
return this;
@@ -313,6 +318,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
313318
if (client instanceof ObTableClient) {
314319
batchOps = new ObTableClientLSBatchOpsImpl(tableName, (ObTableClient) client);
315320
batchOps.setEntityType(entityType);
321+
batchOps.setServerCanRetry(serverCanRetry);
316322
for (Object operation : operations) {
317323
if (operation instanceof CheckAndInsUp) {
318324
checkAndInsUpCnt++;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@
3333
/*
3434
OB_SERIALIZE_MEMBER(ObHbaseRequest,
3535
credential_,
36+
option_flag_,
37+
op_type_,
3638
table_name_,
37-
rows_);
39+
keys_,
40+
cf_rows_);
3841
*/
3942
/*
4043
[k1][k2][k3]...
@@ -44,8 +47,9 @@
4447
public class ObHbaseRequest extends AbstractPayload implements Credentialable {
4548
protected ObBytesString credential;
4649
protected String tableName; // HBase tableName, OceanBase tablegroup_name
50+
protected ObTableHbaseReqFlag optionFlag = new ObTableHbaseReqFlag();
4751
protected ObTableOperationType opType;
48-
protected List<ObObj> keys = new ArrayList<>();
52+
protected List<ObObj> keys = new ArrayList<>();
4953
protected List<ObHbaseCfRows> cfRows;
5054

5155
public ObHbaseRequest() {
@@ -87,17 +91,20 @@ public byte[] encode() {
8791
// 2. encode tableName
8892
Serialization.encodeVString(buf, tableName);
8993

90-
// 3. encode op_type
94+
// 3. encode option flag
95+
Serialization.encodeVi64(buf, optionFlag.getValue());
96+
97+
// 4. encode op_type
9198
Serialization.encodeI8(buf, opType.getByteValue());
9299

93-
// 4. encode keys array size and keys
100+
// 5. encode keys array size and keys
94101
Serialization.encodeVi64(buf, keys.size());
95102
for (int i = 0; i < keys.size(); i++) {
96103
ObObj key = keys.get(i);
97104
ObTableSerialUtil.encode(buf, key);
98105
}
99106

100-
// 5. encode same cf rows array size and rows
107+
// 6. encode same cf rows array size and rows
101108
Serialization.encodeVi64(buf, cfRows.size());
102109
for (int i = 0; i < cfRows.size(); i++) {
103110
ObHbaseCfRows sameCfRows = cfRows.get(i);
@@ -128,6 +135,7 @@ public long getPayloadContentSize() {
128135
if (payLoadContentSize == INVALID_PAYLOAD_CONTENT_SIZE) {
129136
payLoadContentSize = Serialization.getNeedBytes(credential)
130137
+ Serialization.getNeedBytes(tableName)
138+
+ Serialization.getNeedBytes(optionFlag.getValue())
131139
+ Serialization.getNeedBytes(opType.getByteValue());
132140

133141
// Size for keys array
@@ -145,6 +153,10 @@ public long getPayloadContentSize() {
145153
return payLoadContentSize;
146154
}
147155

156+
public void setTimeout(long timeout) {
157+
this.timeout = timeout;
158+
}
159+
148160
@Override
149161
public void setCredential(ObBytesString credential) {
150162
this.credential = credential;
@@ -166,6 +178,14 @@ public void setCfRows(List<ObHbaseCfRows> sameCfRows) {
166178
this.cfRows = sameCfRows;
167179
}
168180

181+
public void setServerCanRetry(boolean canRetry) {
182+
optionFlag.setFlagServerCanRetry(canRetry);
183+
}
184+
185+
public boolean getServerCanRetry() {
186+
return optionFlag.getFlagServerCanRetry();
187+
}
188+
169189
public ObBytesString getCredential() {
170190
return credential;
171191
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,14 @@ public void setOptionFlag(ObTableOptionFlag optionFlagflag) {
208208
this.option_flag = optionFlagflag;
209209
}
210210

211+
public void setServerCanRetry(boolean canRetry) {
212+
option_flag.setServerCanRetry(canRetry);
213+
}
214+
215+
public boolean getServerCanRetry() {
216+
return option_flag.isServerCanRetry();
217+
}
218+
211219
/*
212220
* Is returning affected entity.
213221
*/
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
2+
3+
public class ObTableHbaseReqFlag {
4+
private static final int FLAG_SERVER_CAN_RETRY = 1 << 0;
5+
private long flags = 0;
6+
7+
public long getValue() {
8+
return flags;
9+
}
10+
11+
public void setValue(long flags) {
12+
this.flags = flags;
13+
}
14+
15+
public void setFlagServerCanRetry(boolean serverCanRetry) {
16+
if (serverCanRetry) {
17+
flags |= FLAG_SERVER_CAN_RETRY;
18+
} else {
19+
flags &= ~FLAG_SERVER_CAN_RETRY;
20+
}
21+
}
22+
23+
public boolean getFlagServerCanRetry() {
24+
return (this.flags & FLAG_SERVER_CAN_RETRY) == 1;
25+
}
26+
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpFlag.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class ObTableLSOpFlag {
2222
private static final int FLAG_IS_SAME_PROPERTIES_NAMES = 1 << 1;
2323
private static final int FLAG_RETURN_ONE_RESULT = 1 << 2;
2424
private static final int FLAG_NEED_ALL_PROP = 1 << 3;
25+
private static final int FLAG_SERVER_CAN_RETRY = 1 << 4;
2526
private long flags = 0;
2627

2728
public void setFlagIsSameType(boolean isSameType) {
@@ -56,6 +57,14 @@ public void setFlagNeedAllProp(boolean needAllProp) {
5657
}
5758
}
5859

60+
public void setFlagServerCanRetry(boolean serverCanRetry) {
61+
if (serverCanRetry) {
62+
flags |= FLAG_SERVER_CAN_RETRY;
63+
} else {
64+
flags &= ~FLAG_SERVER_CAN_RETRY;
65+
}
66+
}
67+
5968
public long getValue() {
6069
return flags;
6170
}
@@ -75,4 +84,8 @@ public boolean getFlagIsSamePropertiesNames() {
7584
public boolean getFlagNeedAllProp() {
7685
return (flags & FLAG_NEED_ALL_PROP) != 0;
7786
}
87+
88+
public boolean getFlagServerCanRetry() {
89+
return (flags & FLAG_SERVER_CAN_RETRY) != 0;
90+
}
7891
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,10 @@ public void setIsSamePropertiesNames(boolean isSamePropertiesNames) {
290290

291291
public boolean isNeedAllProp() { return optionFlag.getFlagNeedAllProp(); }
292292

293+
public void setServerCanRetry(boolean canRetry) { optionFlag.setFlagServerCanRetry(canRetry); }
294+
295+
public boolean isServerCanRetry() { return optionFlag.getFlagServerCanRetry(); }
296+
293297
public long getTableId() {
294298
return tableId;
295299
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOptionFlag.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import java.util.Map;
2222

2323
public enum ObTableOptionFlag {
24-
25-
DEFAULT(0), RETURNING_ROWKEY(1 << 0), USE_PUT(1 << 1), RETURN_ONE_RES(1 << 2);
24+
DEFAULT(0),
25+
RETURNING_ROWKEY(1 << 0),
26+
USE_PUT(1 << 1),
27+
RETURN_ONE_RES(1 << 2),
28+
SERVER_CAN_RETRY(1 << 3);
2629

2730
private int value;
2831
private static Map<Integer, ObTableOptionFlag> map = new HashMap<Integer, ObTableOptionFlag>();
@@ -71,14 +74,14 @@ public void setReturningRowkey(boolean returningRowKey) {
7174
* Get isReturningRowKey.
7275
*/
7376
public boolean isReturningRowKey() {
74-
return this.value == RETURNING_ROWKEY.value;
77+
return (this.value & RETURNING_ROWKEY.value) != 0;
7578
}
7679

7780
/*
7881
* Get usePut.
7982
*/
8083
public boolean isUsePut() {
81-
return this.value == USE_PUT.value;
84+
return (this.value & USE_PUT.value) != 0;
8285
}
8386

8487
/*
@@ -91,7 +94,7 @@ public void setUsePut(boolean usePut) {
9194
}
9295

9396
public boolean isReturnOneResult() {
94-
return (this.value & RETURN_ONE_RES.value) == 1;
97+
return (this.value & RETURN_ONE_RES.value) != 0;
9598
}
9699

97100
public void setReturnOneResult(boolean returnOneResult) {
@@ -101,4 +104,16 @@ public void setReturnOneResult(boolean returnOneResult) {
101104
this.value &= ~(RETURN_ONE_RES.value);
102105
}
103106
}
107+
108+
public boolean isServerCanRetry() {
109+
return (this.value & SERVER_CAN_RETRY.value) != 0;
110+
}
111+
112+
public void setServerCanRetry(boolean serverCanRetry) {
113+
if (serverCanRetry) {
114+
this.value |= SERVER_CAN_RETRY.value;
115+
} else {
116+
this.value &= ~(SERVER_CAN_RETRY.value);
117+
}
118+
}
104119
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutateRequest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ public byte[] encode() {
6666
int len = (int) tableQueryAndMutate.getPayloadSize();
6767
System.arraycopy(tableQueryAndMutate.encode(), 0, bytes, idx, len);
6868

69+
// encode ObBinlogRowImageType::FULL (2)
70+
idx += len;
71+
len = Serialization.getNeedBytes(2);
72+
System.arraycopy(Serialization.encodeVi32(2), 0, bytes, idx, len);
73+
74+
idx += len;
75+
System.arraycopy(Serialization.encodeI8(option_flag.getByteValue()), 0, bytes, idx, 1);
76+
6977
return bytes;
7078
}
7179

@@ -99,7 +107,7 @@ public long getPayloadContentSize() {
99107
if (ObGlobal.obVsnMajor() >= 4)
100108
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
101109
+ Serialization.getNeedBytes(tableId) + 8 + 1
102-
+ tableQueryAndMutate.getPayloadSize();
110+
+ tableQueryAndMutate.getPayloadSize() + 1;
103111
else
104112
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
105113
+ Serialization.getNeedBytes(tableId) + Serialization.getNeedBytes(partitionId)

0 commit comments

Comments
 (0)