Skip to content

Commit 6721123

Browse files
authored
support rollback when check failed (#265)
* support rollback when check failed * fix review comment
1 parent c40a50d commit 6721123

File tree

11 files changed

+189
-15
lines changed

11 files changed

+189
-15
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3299,7 +3299,7 @@ public ObPayload mutationWithFilter(final TableQuery tableQuery, final Row rowKe
32993299
final ObTableOperation operation, final boolean withResult)
33003300
throws Exception {
33013301
return mutationWithFilter(tableQuery, rowKey, keyRanges, operation, withResult, false,
3302-
false);
3302+
false, false);
33033303
}
33043304

33053305
/**
@@ -3311,13 +3311,15 @@ public ObPayload mutationWithFilter(final TableQuery tableQuery, final Row rowKe
33113311
* @param withResult whether to bring back result
33123312
* @param checkAndExecute whether execute check and execute instead of query and mutate
33133313
* @param checkExists whether to check exists or not
3314+
* @param rollbackWhenCheckFailed whether rollback or not when check failed
33143315
* @return execute result
33153316
* @throws Exception exception
33163317
*/
33173318
public ObPayload mutationWithFilter(final TableQuery tableQuery, final Row rowKey,
33183319
final List<ObNewRange> keyRanges,
33193320
final ObTableOperation operation, final boolean withResult,
3320-
final boolean checkAndExecute, final boolean checkExists)
3321+
final boolean checkAndExecute, final boolean checkExists,
3322+
final boolean rollbackWhenCheckFailed)
33213323
throws Exception {
33223324
final long start = System.currentTimeMillis();
33233325
if (tableQuery != null && tableQuery.getObTableQuery().getKeyRanges().isEmpty()) {
@@ -3343,6 +3345,7 @@ public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
33433345
request.setPartitionId(tableParam.getPartitionId());
33443346
request.getTableQueryAndMutate().setIsCheckAndExecute(checkAndExecute);
33453347
request.getTableQueryAndMutate().setIsCheckNoExists(!checkExists);
3348+
request.getTableQueryAndMutate().setIsRollbackWhenCheckFailed(rollbackWhenCheckFailed);
33463349
ObPayload result = executeWithRetry(obTable, request, tableQuery.getTableName());
33473350
String endpoint = obTable.getIp() + ":" + obTable.getPort();
33483351
MonitorUtil.info(request, database, tableQuery.getTableName(), "QUERY_AND_MUTATE",
@@ -3671,6 +3674,14 @@ public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter,
36713674
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists);
36723675
}
36733676

3677+
/**
3678+
* checkAndInsUp.
3679+
*/
3680+
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
3681+
boolean checkExists, boolean rollbackWhenCheckFailed) {
3682+
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists, rollbackWhenCheckFailed);
3683+
}
3684+
36743685
/**
36753686
* Set full username
36763687
* @param fullUserName user name

src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,33 @@ public class CheckAndInsUp {
3838
private String tableName;
3939
private ObTableFilter filter;
4040
private InsertOrUpdate insUp;
41-
boolean checkExists = true;
41+
private boolean checkExists = true;
42+
private boolean rollbackWhenCheckFailed = false;
4243

4344
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists)
4445
throws IllegalArgumentException {
45-
this.filter = filter;
46-
this.insUp = insUp;
47-
this.checkExists = check_exists;
48-
this.tableName = null;
49-
this.client = null;
46+
this(null, null, filter, insUp, check_exists, false);
47+
}
48+
49+
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists, boolean rollbackWhenCheckFailed)
50+
throws IllegalArgumentException {
51+
this(null, null, filter, insUp, check_exists, rollbackWhenCheckFailed);
5052
}
5153

5254
public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
5355
InsertOrUpdate insUp, boolean check_exists)
5456
throws IllegalArgumentException {
57+
this(client, null, filter, insUp, check_exists, false);
58+
}
59+
60+
public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,InsertOrUpdate insUp,
61+
boolean check_exists, boolean rollbackWhenCheckFailed) throws IllegalArgumentException {
5562
this.client = client;
5663
this.tableName = tableName;
5764
this.filter = filter;
5865
this.insUp = insUp;
5966
this.checkExists = check_exists;
67+
this.rollbackWhenCheckFailed = rollbackWhenCheckFailed;
6068
}
6169

6270
public Row getRowKey() {
@@ -75,6 +83,10 @@ public boolean isCheckExists() {
7583
return checkExists;
7684
}
7785

86+
public boolean isRollbackWhenCheckFailed() {
87+
return rollbackWhenCheckFailed;
88+
}
89+
7890
public MutationResult execute() throws Exception {
7991
if (null == tableName || tableName.isEmpty()) {
8092
throw new ObTableException("table name is null");
@@ -96,6 +108,7 @@ public MutationResult execute() throws Exception {
96108
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
97109
insUp.getRowKey().getValues(), insUp.getColumns(), insUp.getValues());
98110

99-
return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists));
111+
return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation,
112+
false, true, checkExists, rollbackWhenCheckFailed));
100113
}
101114
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,9 @@ public enum ResultCodes {
739739
OB_KV_FILTER_PARSE_ERROR(-10514), //
740740
OB_KV_REDIS_PARSE_ERROR(-10515), //
741741
OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG(-10516), //
742-
OB_KV_ODP_TIMEOUT(-10650), OB_ERR_KV_ROUTE_ENTRY_EXPIRE(-10653);
742+
OB_KV_CHECK_FAILED(-10518), //
743+
OB_KV_ODP_TIMEOUT(-10650), //
744+
OB_ERR_KV_ROUTE_ENTRY_EXPIRE(-10653);
743745

744746
public final int errorCode;
745747

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableQueryAndMutateFlag.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
1919

2020
public class ObTableQueryAndMutateFlag {
21-
private static final int FLAG_IS_CHECK_AND_EXECUTE = 1 << 0;
22-
private static final int FLAG_IS_CHECK_NOT_EXISTS = 1 << 1;
23-
private long flags = 0;
21+
private static final int FLAG_IS_CHECK_AND_EXECUTE = 1 << 0;
22+
private static final int FLAG_IS_CHECK_NOT_EXISTS = 1 << 1;
23+
private static final int FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED = 1 << 2;
24+
private long flags = 0;
2425

2526
public void setIsCheckAndExecute(boolean isCheckAndExecute) {
2627
if (isCheckAndExecute) {
@@ -38,6 +39,14 @@ public void setIsCheckNotExists(boolean isCheckNotExists) {
3839
}
3940
}
4041

42+
public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
43+
if (isRollbackWhenCheckFailed) {
44+
flags |= FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
45+
} else {
46+
flags &= ~FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
47+
}
48+
}
49+
4150
public long getValue() {
4251
return flags;
4352
}
@@ -46,7 +55,11 @@ public boolean isCheckNotExists() {
4655
return (flags & FLAG_IS_CHECK_NOT_EXISTS) != 0;
4756
}
4857

49-
public boolean isChekAndExecute() {
58+
public boolean isCheckAndExecute() {
5059
return (flags & FLAG_IS_CHECK_AND_EXECUTE) != 0;
5160
}
61+
62+
public boolean isRollbackWhenCheckFailed() {
63+
return (flags & FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED) != 0;
64+
}
5265
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ public void setIsCheckNoExists(boolean isCheckNoExists) {
128128
singleOpFlag.setIsCheckNotExists(isCheckNoExists);
129129
}
130130

131+
public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
132+
singleOpFlag.setIsRollbackWhenCheckFailed(isRollbackWhenCheckFailed);
133+
}
134+
131135
public ObTableOperationType getSingleOpType() {
132136
return singleOpType;
133137
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpFlag.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
public class ObTableSingleOpFlag {
2121
private static final int FLAG_IS_CHECK_NOT_EXISTS = 1 << 0;
22+
private static final int FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED = 1 << 1;
2223
private long flags = 0;
2324

2425
public void setIsCheckNotExists(boolean isCheckNotExists) {
@@ -29,6 +30,14 @@ public void setIsCheckNotExists(boolean isCheckNotExists) {
2930
}
3031
}
3132

33+
public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
34+
if (isRollbackWhenCheckFailed) {
35+
flags |= FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
36+
} else {
37+
flags &= ~FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED;
38+
}
39+
}
40+
3241
public long getValue() {
3342
return flags;
3443
}
@@ -37,6 +46,10 @@ public boolean isCheckNotExists() {
3746
return (flags & FLAG_IS_CHECK_NOT_EXISTS) != 0;
3847
}
3948

49+
public boolean isRollbackWhenCheckFailed() {
50+
return (flags & FLAG_IS_ROLLBACK_WHEN_CHECK_FAILED) != 0;
51+
}
52+
4053
void setValue(long value) {
4154
flags = value;
4255
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutate.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,8 @@ public void setIsCheckAndExecute(boolean isCheckAndExecute) {
164164
public void setIsCheckNoExists(boolean isCheckNoExists) {
165165
queryAndMutateFlag.setIsCheckNotExists(isCheckNoExists);
166166
}
167+
168+
public void setIsRollbackWhenCheckFailed(boolean isRollbackWhenCheckFailed) {
169+
queryAndMutateFlag.setIsRollbackWhenCheckFailed(isRollbackWhenCheckFailed);
170+
}
167171
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,21 @@ public BatchOperation batchOperation(String tableName) {
347347
}
348348

349349
/**
350-
* Insert.
350+
* checkAndInsUp.
351351
*/
352352
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter,
353353
InsertOrUpdate insUp, boolean checkExists) {
354354
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists);
355355
}
356356

357+
/**
358+
* checkAndInsUp.
359+
*/
360+
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
361+
boolean checkExists, boolean rollbackWhenCheckFailed) {
362+
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists, rollbackWhenCheckFailed);
363+
}
364+
357365
/*
358366
* Execute.
359367
*/

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public void addOperation(CheckAndInsUp checkAndInsUp) {
176176
ObTableSingleOp singleOp = new ObTableSingleOp();
177177
singleOp.setSingleOpType(ObTableOperationType.CHECK_AND_INSERT_UP);
178178
singleOp.setIsCheckNoExists(!checkAndInsUp.isCheckExists());
179+
singleOp.setIsRollbackWhenCheckFailed(checkAndInsUp.isRollbackWhenCheckFailed());
179180
singleOp.setQuery(query);
180181
singleOp.addEntity(entity);
181182

src/main/java/com/alipay/oceanbase/rpc/table/api/Table.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,6 @@ Map<String, Object> append(String tableName, Object[] rowkeys, String[] columns,
9797

9898
CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
9999
boolean checkExists);
100+
CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp,
101+
boolean checkExists, boolean rollbackWhenCheckFailed);
100102
}

0 commit comments

Comments
 (0)