Skip to content

Commit 0b273b0

Browse files
authored
add get interface (#207)
1 parent 47ad913 commit 0b273b0

File tree

8 files changed

+491
-26
lines changed

8 files changed

+491
-26
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alipay.oceanbase.rpc.constant.Constants;
2323
import com.alipay.oceanbase.rpc.exception.*;
2424
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
25+
import com.alipay.oceanbase.rpc.get.Get;
2526
import com.alipay.oceanbase.rpc.location.model.*;
2627
import com.alipay.oceanbase.rpc.location.model.partition.*;
2728
import com.alipay.oceanbase.rpc.mutation.*;
@@ -715,11 +716,11 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
715716
}
716717
}
717718

718-
private abstract class MutationExecuteCallback<T> {
719+
private abstract class OperationExecuteCallback<T> {
719720
private final Row rowKey;
720721
private final List<ObNewRange> keyRanges;
721722

722-
MutationExecuteCallback(Row rowKey, List<ObNewRange> keyRanges) {
723+
OperationExecuteCallback(Row rowKey, List<ObNewRange> keyRanges) {
723724
this.rowKey = rowKey;
724725
this.keyRanges = keyRanges;
725726
}
@@ -773,16 +774,16 @@ public List<ObNewRange> getKeyRanges() {
773774
/**
774775
* For mutation
775776
*/
776-
private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callback)
777+
private <T> T execute(String tableName, OperationExecuteCallback<T> callback)
777778
throws Exception {
778779
// force strong read by default, for backward compatibility.
779-
return executeMutation(tableName, callback, getRoute(false));
780+
return execute(tableName, callback, getRoute(false));
780781
}
781782

782783
/**
783784
* Execute with a route strategy for mutation
784785
*/
785-
private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callback,
786+
private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
786787
ObServerRoute route) throws Exception {
787788
if (tableName == null || tableName.isEmpty()) {
788789
throw new IllegalArgumentException("table name is null");
@@ -818,7 +819,7 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
818819
obPair = getTable(tableName, new ObTableQuery(),
819820
callback.getKeyRanges());
820821
} else {
821-
throw new ObTableException("rowkey and scan range are null in mutation");
822+
throw new ObTableException("RowKey or scan range is null");
822823
}
823824
}
824825
T t = callback.execute(obPair);
@@ -2415,8 +2416,8 @@ public ObPayload updateWithResult(final String tableName, final Row rowKey,
24152416
final List<ObNewRange> keyRanges, final String[] columns,
24162417
final Object[] values) throws Exception {
24172418
final long start = System.currentTimeMillis();
2418-
return executeMutation(tableName,
2419-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
2419+
return execute(tableName,
2420+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
24202421
/**
24212422
* Execute.
24222423
*/
@@ -2495,8 +2496,8 @@ public Long execute(ObPair<Long, ObTableParam> obPair) throws Exception {
24952496
public ObPayload deleteWithResult(final String tableName, final Row rowKey,
24962497
final List<ObNewRange> keyRanges) throws Exception {
24972498
final long start = System.currentTimeMillis();
2498-
return executeMutation(tableName,
2499-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
2499+
return execute(tableName,
2500+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
25002501

25012502
/**
25022503
* Execute.
@@ -2579,8 +2580,8 @@ public ObPayload insertWithResult(final String tableName, final Row rowKey,
25792580
final List<ObNewRange> keyRanges, final String[] columns,
25802581
final Object[] values) throws Exception {
25812582
final long start = System.currentTimeMillis();
2582-
return executeMutation(tableName,
2583-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
2583+
return execute(tableName,
2584+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
25842585
/**
25852586
* Execute.
25862587
*/
@@ -2606,6 +2607,51 @@ public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
26062607
});
26072608
}
26082609

2610+
/**
2611+
* Get.
2612+
*/
2613+
public Get get(String tableName) {
2614+
return new Get(this, tableName);
2615+
}
2616+
2617+
/**
2618+
* get
2619+
* @param tableName which table to insert
2620+
* @param rowKey insert row key
2621+
* @param selectColumns select columns
2622+
* @return execute result
2623+
* @throws Exception exception
2624+
*/
2625+
public Map<String, Object> get(final String tableName, final Row rowKey,
2626+
final String[] selectColumns) throws Exception {
2627+
final long start = System.currentTimeMillis();
2628+
return execute(tableName,
2629+
new OperationExecuteCallback<Map<String, Object>>(rowKey, null) {
2630+
/**
2631+
* Execute.
2632+
*/
2633+
@Override
2634+
public Map<String, Object> execute(ObPair<Long, ObTableParam> obPair) throws Exception {
2635+
long TableTime = System.currentTimeMillis();
2636+
ObTableParam tableParam = obPair.getRight();
2637+
ObTable obTable = tableParam.getObTable();
2638+
ObTableOperationRequest request = ObTableOperationRequest.getInstance(
2639+
tableName, GET, rowKey.getValues(), selectColumns, null,
2640+
obTable.getObTableOperationTimeout());
2641+
request.setTableId(tableParam.getTableId());
2642+
// partId/tabletId
2643+
request.setPartitionId(tableParam.getPartitionId());
2644+
ObPayload result = executeWithRetry(obTable, request, tableName);
2645+
String endpoint = obTable.getIp() + ":" + obTable.getPort();
2646+
MonitorUtil.info(request, database, tableName, "GET", endpoint,
2647+
rowKey.getValues(), (ObTableOperationResult) result, TableTime - start,
2648+
System.currentTimeMillis() - TableTime, getslowQueryMonitorThreshold());
2649+
checkResult(obTable.getIp(), obTable.getPort(), request, result);
2650+
return ((ObTableOperationResult) result).getEntity().getSimpleProperties();
2651+
}
2652+
});
2653+
}
2654+
26092655
/**
26102656
* put with result
26112657
* @param tableName which table to put
@@ -2620,8 +2666,8 @@ public ObPayload putWithResult(final String tableName, final Row rowKey,
26202666
final List<ObNewRange> keyRanges, final String[] columns,
26212667
final Object[] values) throws Exception {
26222668
final long start = System.currentTimeMillis();
2623-
return executeMutation(tableName,
2624-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
2669+
return execute(tableName,
2670+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
26252671
/**
26262672
* Execute.
26272673
*/
@@ -2703,8 +2749,8 @@ public ObPayload replaceWithResult(final String tableName, final Row rowKey,
27032749
final List<ObNewRange> keyRanges, final String[] columns,
27042750
final Object[] values) throws Exception {
27052751
final long start = System.currentTimeMillis();
2706-
return executeMutation(tableName,
2707-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
2752+
return execute(tableName,
2753+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
27082754
/**
27092755
* Execute.
27102756
*/
@@ -2788,8 +2834,8 @@ public ObPayload insertOrUpdateWithResult(final String tableName, final Row rowK
27882834
final String[] columns, final Object[] values,
27892835
boolean usePut) throws Exception {
27902836
final long start = System.currentTimeMillis();
2791-
return executeMutation(tableName,
2792-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
2837+
return execute(tableName,
2838+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
27932839
/**
27942840
* Execute.
27952841
*/
@@ -2894,8 +2940,8 @@ public ObPayload incrementWithResult(final String tableName, final Row rowKey,
28942940
final Object[] values, final boolean withResult)
28952941
throws Exception {
28962942
final long start = System.currentTimeMillis();
2897-
return executeMutation(tableName,
2898-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
2943+
return execute(tableName,
2944+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
28992945
/**
29002946
*
29012947
* @param obPair
@@ -2979,8 +3025,8 @@ public ObPayload appendWithResult(final String tableName, final Row rowKey,
29793025
final Object[] values, final boolean withResult)
29803026
throws Exception {
29813027
final long start = System.currentTimeMillis();
2982-
return executeMutation(tableName,
2983-
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
3028+
return execute(tableName,
3029+
new OperationExecuteCallback<ObPayload>(rowKey, keyRanges) {
29843030
@Override
29853031
public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
29863032
long TableTime = System.currentTimeMillis();
@@ -3270,7 +3316,7 @@ public ObPayload mutationWithFilter(final TableQuery tableQuery, final Row rowKe
32703316
// fill a whole range if no range is added explicitly.
32713317
tableQuery.getObTableQuery().addKeyRange(ObNewRange.getWholeRange());
32723318
}
3273-
return executeMutation(tableQuery.getTableName(), new MutationExecuteCallback<ObPayload>(
3319+
return execute(tableQuery.getTableName(), new OperationExecuteCallback<ObPayload>(
32743320
rowKey, keyRanges) {
32753321
/**
32763322
* Execute.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.alipay.oceanbase.rpc.get;
2+
3+
import com.alipay.oceanbase.rpc.ObTableClient;
4+
import com.alipay.oceanbase.rpc.mutation.ColumnValue;
5+
import com.alipay.oceanbase.rpc.mutation.Row;
6+
import com.alipay.oceanbase.rpc.table.api.Table;
7+
8+
import java.util.Map;
9+
10+
public class Get {
11+
private Table client;
12+
private String tableName;
13+
protected Row rowKey;
14+
protected String[] selectColumns;
15+
16+
public Get() {
17+
tableName = null;
18+
client = null;
19+
rowKey = null;
20+
selectColumns = null;
21+
}
22+
23+
public Get(Table client, String tableName) {
24+
this.client = client;
25+
this.tableName = tableName;
26+
}
27+
28+
public Get setRowKey(Row rowKey) {
29+
this.rowKey = rowKey;
30+
if (null != tableName && ((ObTableClient) client).getRowKeyElement(tableName) == null) {
31+
((ObTableClient) client).addRowKeyElement(tableName, this.rowKey.getColumns());
32+
}
33+
return this;
34+
}
35+
36+
public Get setRowKey(ColumnValue... rowKey) {
37+
this.rowKey = new Row(rowKey);
38+
if (null != tableName && ((ObTableClient) client).getRowKeyElement(tableName) == null) {
39+
((ObTableClient) client).addRowKeyElement(tableName, this.rowKey.getColumns());
40+
}
41+
return this;
42+
}
43+
44+
public Row getRowKey() {
45+
return rowKey;
46+
}
47+
48+
public Get select(String... columns) {
49+
this.selectColumns = columns;
50+
return this;
51+
}
52+
53+
public String[] getSelectColumns() {
54+
return selectColumns;
55+
}
56+
57+
public Map<String, Object> execute() throws Exception {
58+
if (client == null) {
59+
throw new IllegalArgumentException("client is null");
60+
}
61+
return ((ObTableClient)client).get(tableName, rowKey, selectColumns);
62+
}
63+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.alipay.oceanbase.rpc.get.result;
2+
3+
import com.alipay.oceanbase.rpc.exception.ObTableException;
4+
import com.alipay.oceanbase.rpc.mutation.Row;
5+
import com.alipay.oceanbase.rpc.mutation.result.OperationResult;
6+
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
7+
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
8+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
9+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableSingleOpResult;
10+
11+
import java.util.Map;
12+
13+
public class GetResult extends OperationResult {
14+
15+
public GetResult(ObPayload result) {
16+
super(result);
17+
}
18+
19+
/*
20+
* get the result rows of operation
21+
*/
22+
public Row getOperationRow() {
23+
Map<String, Object> rowsMap;
24+
switch (result.getPcode()) {
25+
case Pcodes.OB_TABLE_API_EXECUTE:
26+
rowsMap = ((ObTableOperationResult) result).getEntity().getSimpleProperties();
27+
break;
28+
case Pcodes.OB_TABLE_API_LS_EXECUTE:
29+
rowsMap = ((ObTableSingleOpResult) result).getEntity().getSimpleProperties();
30+
break;
31+
default:
32+
throw new ObTableException("unknown result type: " + result.getPcode());
33+
}
34+
return new Row(rowsMap);
35+
}
36+
}

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp;
2222
import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
2323
import com.alipay.oceanbase.rpc.exception.ObTableException;
24+
import com.alipay.oceanbase.rpc.get.Get;
2425
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
2526
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
@@ -98,6 +99,20 @@ public BatchOperation addOperation(TableQuery... queries) {
9899
return this;
99100
}
100101

102+
/*
103+
* add get
104+
*/
105+
public BatchOperation addOperation(Get... gets) {
106+
if (isSameType && lastType != ObTableOperationType.INVALID
107+
&& lastType != ObTableOperationType.GET) {
108+
isSameType = false;
109+
}
110+
111+
lastType = ObTableOperationType.GET;
112+
this.operations.addAll(Arrays.asList(gets));
113+
return this;
114+
}
115+
101116
/*
102117
* add mutations
103118
*/
@@ -251,6 +266,9 @@ private BatchOperationResult executeWithNormalBatchOp() throws Exception {
251266
TableQuery query = (TableQuery) operation;
252267
batchOps.get(query.getRowKey().getValues(),
253268
query.getSelectColumns().toArray((new String[0])));
269+
} else if (operation instanceof Get) {
270+
Get get = (Get) operation;
271+
batchOps.get(get.getRowKey().getValues(), get.getSelectColumns());
254272
} else {
255273
throw new ObTableException("unknown operation " + operation);
256274
}
@@ -294,6 +312,9 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
294312
rowKeyNames.toArray(new String[0]));
295313
hasSetRowkeyElement = true;
296314
}
315+
} else if (operation instanceof Get) {
316+
Get get = (Get) operation;
317+
batchOps.addOperation(get);
297318
} else if (operation instanceof TableQuery) {
298319
TableQuery query = (TableQuery) operation;
299320
batchOps.addOperation(query);

src/main/java/com/alipay/oceanbase/rpc/mutation/result/BatchOperationResult.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alipay.oceanbase.rpc.mutation.result;
1919

2020
import com.alipay.oceanbase.rpc.exception.ObTableException;
21+
import com.alipay.oceanbase.rpc.get.result.GetResult;
2122
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
2223

2324
import java.util.ArrayList;
@@ -81,11 +82,18 @@ public ObTableException getFirstException() {
8182
* get result
8283
*/
8384
public OperationResult get(int pos) {
85+
OperationResult result = null;
8486
if (pos >= results.size()) {
8587
throw new IllegalArgumentException("Invalid pos: " + pos
8688
+ ", while size of results is: " + results.size());
8789
}
88-
return (MutationResult) results.get(pos);
90+
91+
if (results.get(pos) instanceof MutationResult) {
92+
result = (MutationResult) results.get(pos);
93+
} else if (results.get(pos) instanceof GetResult) {
94+
result = (GetResult) results.get(pos);
95+
}
96+
return result;
8997
}
9098

9199
/*

0 commit comments

Comments
 (0)