Skip to content

Commit d872280

Browse files
authored
add query_and_mutate check_and_mutate (#305)
* add query_and_mutate check_and_mutate * fix review
1 parent 6660335 commit d872280

File tree

4 files changed

+83
-1
lines changed

4 files changed

+83
-1
lines changed

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2727
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2828
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
29+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate;
30+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
31+
import com.alipay.oceanbase.rpc.queryandmutate.QueryAndMutate;
2932
import com.alipay.oceanbase.rpc.table.ObTableClientLSBatchOpsImpl;
3033
import com.alipay.oceanbase.rpc.table.api.Table;
3134
import com.alipay.oceanbase.rpc.table.api.TableBatchOps;
@@ -110,6 +113,12 @@ public BatchOperation addOperation(TableQuery... queries) {
110113
return this;
111114
}
112115

116+
public BatchOperation addOperation(QueryAndMutate qm) {
117+
lastType = ObTableOperationType.QUERY_AND_MUTATE;
118+
this.operations.add(qm);
119+
return this;
120+
}
121+
113122
/*
114123
* add get
115124
*/
@@ -336,6 +345,9 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
336345
} else if (operation instanceof TableQuery) {
337346
TableQuery query = (TableQuery) operation;
338347
batchOps.addOperation(query);
348+
} else if (operation instanceof QueryAndMutate) {
349+
QueryAndMutate qm = (QueryAndMutate) operation;
350+
batchOps.addOperation(qm);
339351
} else {
340352
throw new IllegalArgumentException(
341353
"The operations in batch must be all checkAndInsUp or all non-checkAndInsUp");

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public enum ObTableOperationType {
3030
TTL(9), // observer internal type, not used by client
3131
CHECK_AND_INSERT_UP(10), PUT(11), // override row
3232
TRIGGER(12), // internal op type
33-
INVALID(15);
33+
REDIS(13), // not used
34+
QUERY_AND_MUTATE(14), CHECK_AND_MUTATE(15), INVALID(16);
3435

3536
private int value;
3637
private static Map<Integer, ObTableOperationType> map = new HashMap<Integer, ObTableOperationType>();
@@ -46,6 +47,10 @@ public enum ObTableOperationType {
4647
false, // TTL
4748
true, // CHECK_AND_INSERT_UP
4849
false, // PUT
50+
false, // TRIGGER
51+
false, // REDIS
52+
true, // QUERY_AND_MUTATE,
53+
true, // CHECK_AND_MUTATE,
4954
false // INVALID
5055
);
5156

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.queryandmutate;
19+
20+
import com.alipay.oceanbase.rpc.mutation.Mutation;
21+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
22+
23+
public class QueryAndMutate {
24+
private ObTableQuery query;
25+
private Mutation mutation;
26+
27+
public QueryAndMutate(ObTableQuery query, Mutation mutation) {
28+
this.query = query;
29+
this.mutation = mutation;
30+
}
31+
32+
public ObTableQuery getQuery() {
33+
return query;
34+
}
35+
36+
public Mutation getMutation() {
37+
return mutation;
38+
}
39+
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
3737
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
3838
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
39+
import com.alipay.oceanbase.rpc.queryandmutate.QueryAndMutate;
3940
import com.alipay.oceanbase.rpc.threadlocal.ThreadLocalMap;
4041
import com.alipay.oceanbase.rpc.util.MonitorUtil;
4142
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
@@ -225,6 +226,31 @@ public void addOperation(TableQuery query) throws Exception {
225226
addOperation(singleOp);
226227
}
227228

229+
public void addOperation(QueryAndMutate queryAndMutate) {
230+
231+
ObTableSingleOp singleOp = new ObTableSingleOp();
232+
ObTableQuery obTableQuery = queryAndMutate.getQuery();
233+
if (queryAndMutate.getMutation() instanceof Delete) {
234+
Delete delete = (Delete) queryAndMutate.getMutation();
235+
ObTableSingleOpQuery singleOpQuery = ObTableSingleOpQuery.getInstance(obTableQuery.getIndexName(),
236+
obTableQuery.getKeyRanges(), obTableQuery.getSelectColumns(),
237+
obTableQuery.getScanOrder(), obTableQuery.isHbaseQuery(),
238+
obTableQuery.gethTableFilter(), obTableQuery.getObKVParams(),
239+
obTableQuery.getFilterString());
240+
singleOp.setQuery(singleOpQuery);
241+
singleOp.setQuery(singleOpQuery);
242+
singleOp.setSingleOpType(ObTableOperationType.QUERY_AND_MUTATE);
243+
String[] rowKeyNames = delete.getRowKey().getColumns();
244+
Object[] rowKeyValues = delete.getRowKey().getValues();
245+
ObTableSingleOpEntity entity = ObTableSingleOpEntity.getInstance(rowKeyNames, rowKeyValues,
246+
null, null);
247+
singleOp.addEntity(entity);
248+
addOperation(singleOp);
249+
} else {
250+
throw new ObTableException("invalid operation type " + queryAndMutate.getMutation().getOperationType());
251+
}
252+
}
253+
228254
public void addOperation(Mutation mutation) throws Exception {
229255
// entity
230256
String[] rowKeyNames = null;

0 commit comments

Comments
 (0)