Skip to content

Commit 81ca089

Browse files
authored
support weak read (#406)
1 parent eb0d8b5 commit 81ca089

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+3076
-1592
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,12 @@ public void setReturnOneResult(boolean returnOneResult) {
184184
tableBatchOps.setReturnOneResult(returnOneResult);
185185
}
186186

187+
@Override
188+
public void setIsWeakRead(boolean isWeakRead) {
189+
super.setIsWeakRead(isWeakRead);
190+
tableBatchOps.setIsWeakRead(isWeakRead);
191+
}
192+
187193
void preCheck() {
188194
List<ObTableOperation> operations = this.tableBatchOps.getObTableBatchOperation()
189195
.getTableOperations();

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,26 @@ public TableQuery setSearchText(String searchText) {
271271
return this;
272272
}
273273

274+
@Override
275+
public TableQuery setReadConsistency(String readConsistency) {
276+
// 同时设置父类和内部 tableClientQuery 的 readConsistency
277+
super.setReadConsistency(readConsistency);
278+
tableClientQuery.setReadConsistency(readConsistency);
279+
return this;
280+
}
281+
282+
@Override
283+
public String getReadConsistency() {
284+
// 返回内部 tableClientQuery 的 readConsistency
285+
return tableClientQuery.getReadConsistency();
286+
}
287+
288+
@Override
289+
public TableQuery setScanRangeColumns(String... columns) {
290+
tableClientQuery.setScanRangeColumns(columns);
291+
return this;
292+
}
293+
274294
public void setAllowDistributeScan(boolean allowDistributeScan) {
275295
tableClientQuery.setAllowDistributeScan(allowDistributeScan);
276296
}

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 86 additions & 221 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package com.alipay.oceanbase.rpc.bolt.protocol;
1919

20-
import com.alipay.oceanbase.rpc.exception.ObTableRoutingWrongException;
21-
import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest;
2220
import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse;
2321
import com.alipay.oceanbase.rpc.protocol.packet.ObRpcPacketHeader;
2422
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -36,9 +34,6 @@
3634
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
3735
import com.alipay.remoting.CommandCode;
3836

39-
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_HBASE_EXECUTE;
40-
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_META_INFO_EXECUTE;
41-
4237
public enum ObTablePacketCode implements CommandCode {
4338

4439
OB_TABLE_API_LOGIN(Pcodes.OB_TABLE_API_LOGIN) {

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.alipay.remoting.InvokeContext;
2323
import com.alipay.remoting.InvokeFuture;
2424
import com.alipay.remoting.RemotingCommand;
25-
import com.alipay.oceanbase.rpc.exception.ObTableTimeoutExcetion;
2625
import io.netty.util.Timeout;
2726

2827
import java.net.InetSocketAddress;

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ private boolean connect() throws Exception {
119119

120120
if (tries >= maxTryTimes) {
121121
if (!obTable.isOdpMode()) {
122-
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(obTable.getObServerAddr());
122+
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(
123+
obTable.getObServerAddr());
123124
RouteTableRefresher.addIntoSuspectIPs(suspectAddr);
124125
}
125126
LOGGER.warn("connect failed after max " + maxTryTimes + " tries "

src/main/java/com/alipay/oceanbase/rpc/exception/ObTableSessionNotExistException.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
118
package com.alipay.oceanbase.rpc.exception;
219

320
public class ObTableSessionNotExistException extends ObTableException {

src/main/java/com/alipay/oceanbase/rpc/get/Get.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,17 @@
2020
import com.alipay.oceanbase.rpc.ObTableClient;
2121
import com.alipay.oceanbase.rpc.mutation.ColumnValue;
2222
import com.alipay.oceanbase.rpc.mutation.Row;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableConsistencyLevel;
2324
import com.alipay.oceanbase.rpc.table.api.Table;
2425

2526
import java.util.Map;
2627

2728
public class Get {
28-
private Table client;
29-
private String tableName;
30-
protected Row rowKey;
31-
protected String[] selectColumns;
32-
33-
public Get() {
34-
tableName = null;
35-
client = null;
36-
rowKey = null;
37-
selectColumns = null;
38-
}
29+
private Table client = null;
30+
private String tableName = null;
31+
private Row rowKey = null;
32+
private String[] selectColumns = null;
33+
private String readConsistency = "";
3934

4035
public Get(Table client, String tableName) {
4136
this.client = client;
@@ -67,14 +62,27 @@ public Get select(String... columns) {
6762
return this;
6863
}
6964

65+
public Get setReadConsistency(String readConsistency) {
66+
this.readConsistency = readConsistency;
67+
return this;
68+
}
69+
70+
public String getReadConsistency() {
71+
return readConsistency;
72+
}
73+
7074
public String[] getSelectColumns() {
7175
return selectColumns;
7276
}
7377

7478
public Map<String, Object> execute() throws Exception {
79+
ObTableConsistencyLevel readConsistency = null;
80+
if (this.readConsistency != null && !this.readConsistency.isEmpty()) {
81+
readConsistency = ObTableConsistencyLevel.getByName(this.readConsistency);
82+
}
7583
if (client == null) {
7684
throw new IllegalArgumentException("client is null");
7785
}
78-
return ((ObTableClient) client).get(tableName, rowKey, selectColumns);
86+
return ((ObTableClient) client).get(tableName, rowKey, selectColumns, readConsistency);
7987
}
8088
}

0 commit comments

Comments
 (0)