Skip to content

Commit 4b01611

Browse files
maochongxinstuBirdFlyGroundWuWeiXinChanwyfanxiao
authored
Secondary part merge master (#321)
* feat: support user_specific_T flag for ObTableQueryAndMutate and ObTableSingleOp (#289) * support sync server capacity and negate expr (#288) * adapt for distributedExecute * Revert "adapt for distributedExecute" This reverts commit 095ccbe. * adapt for distributedExecute (#290) * adapt for distributedExecute * fix for compile * fix for ci test * get server capacity in odp mode * fix for reviews * fix sub expr bug (#291) * fix some bugs (#298) * fix sub expr bug * fix queryAndMutate in secondary part * fix tablet_id error bug (#302) * fix sub expr bug * fix scan bug * add client type * mutation,query_and_mutate set a isUserSpecifiedT based on T (#300) * set partition id invalid when isDistributedExecuteSupported * add query_and_mutate check_and_mutate (#305) * add query_and_mutate check_and_mutate * fix review * abs func * add kvMode test for secondary part (#307) * Secondary part rebase master (#309) * support mysql date/datetime (#287) * fix addr expired throw (#292) * add fts case (#294) * Move JDBC driver loading to static initializer (#293) * Optimize JDBC driver loading with thread-safe initialization and reduce log * Move JDBC driver loading to static initializer * remove unused code * remove unused code * Fix issue where low traffic prevents timely detection and refresh of RS when the peer goes offline (#297) * [maven-release-plugin] prepare release obkv-table-client-1.4.1 * [maven-release-plugin] prepare for next development iteration * Update README.md * Fix ob down ls timeout (#299) * SQLException need to refresh obTable roster * refresh tablet address after refresh obTable roster * refresh table roster if obtable is null * use force renew to update roster * add log * update addrExpired after refresh * Capture exceptions in the refreshIndexInfo and do syncRefreshMetadata (#301) * Capture exceptions in the refreshIndexInfo and do syncRefreshMetadata * fix * fix commonExecute not catch refreshException * [maven-release-plugin] prepare release obkv-table-client-1.4.2 * [maven-release-plugin] prepare for next development iteration --------- Co-authored-by: vanson <[email protected]> Co-authored-by: GroundWu <[email protected]> Co-authored-by: junye <[email protected]> Co-authored-by: fan <[email protected]> Co-authored-by: Ziyu Shi <[email protected]> * add version check for distributed execute (#311) * change obhbaseParams defualt param * [Fix] hbase multi partition scan return repetitive data * fix ClientType Incompatible (#316) * fix allowDistributeScan default value (#318) * fix erase table group get nullptr (#315) --------- Co-authored-by: stuBirdFly <[email protected]> Co-authored-by: GroundWu <[email protected]> Co-authored-by: vanson <[email protected]> Co-authored-by: WeiXinChan <[email protected]> Co-authored-by: junye <[email protected]> Co-authored-by: fan <[email protected]> Co-authored-by: Ziyu Shi <[email protected]> Co-authored-by: stuBirdFly <[email protected]> Co-authored-by: shen <[email protected]>
1 parent 960b9e0 commit 4b01611

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1819
-762
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,8 @@ public TableQuery setSearchText(String searchText) {
270270
tableClientQuery.setSearchText(searchText);
271271
return this;
272272
}
273+
274+
public void setAllowDistributeScan(boolean allowDistributeScan) {
275+
tableClientQuery.setAllowDistributeScan(allowDistributeScan);
276+
}
273277
}

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,18 @@ public static boolean isReturnOneResultSupport() {
9797

9898
public static boolean isHBaseBatchGetSupport() {
9999
return OB_VERSION >= OB_VERSION_4_2_5_2 && OB_VERSION < OB_VERSION_4_3_0_0
100-
|| OB_VERSION >= OB_VERSION_4_3_5_1;
100+
|| OB_VERSION >= OB_VERSION_4_3_5_1;
101101
}
102102

103103
public static boolean isHBaseBatchSupport() {
104104
return OB_VERSION >= OB_VERSION_4_2_5_2 && OB_VERSION < OB_VERSION_4_3_0_0
105105
|| OB_VERSION >= OB_VERSION_4_3_5_0;
106106
}
107107

108+
public static boolean isDistributedExecSupport() {
109+
return OB_VERSION >= OB_VERSION_4_3_5_2;
110+
}
111+
108112
public static boolean isCellTTLSupport() {
109113
return OB_VERSION >= OB_VERSION_4_3_5_1;
110114
}
@@ -121,5 +125,7 @@ public static boolean isCellTTLSupport() {
121125

122126
public static final long OB_VERSION_4_3_5_1 = calcVersion(4, (short) 3, (byte) 5, (byte) 1);
123127

128+
public static final long OB_VERSION_4_3_5_2 = calcVersion(4, (short) 3, (byte) 5, (byte) 2);
129+
124130
public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
125131
}

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static com.alipay.oceanbase.rpc.location.model.TableEntry.HBASE_ROW_KEY_ELEMENT;
6363
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartIdCalculator.*;
6464
import static com.alipay.oceanbase.rpc.property.Property.*;
65+
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.INVALID_TABLET_ID;
6566
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
6667
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
6768
import static java.lang.String.format;
@@ -433,7 +434,7 @@ private void initMetadata() throws Exception {
433434
if (odpMode) {
434435
try {
435436
odpTable = new ObTable.Builder(odpAddr, odpPort) //
436-
.setLoginInfo(tenantName, fullUserName, password, database) //
437+
.setLoginInfo(tenantName, fullUserName, password, database, getClientType(runningMode)) //
437438
.setProperties(getProperties()).setConfigs(TableConfigs).build();
438439
} catch (Exception e) {
439440
logger
@@ -486,7 +487,7 @@ private void initMetadata() throws Exception {
486487
// 应急可以直接observer切主
487488
try {
488489
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
489-
.setLoginInfo(tenantName, userName, password, database) //
490+
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
490491
.setProperties(getProperties()).setConfigs(TableConfigs).build();
491492
tableRoster.put(addr, obTable);
492493
servers.add(addr);
@@ -973,7 +974,7 @@ public void syncRefreshMetadata(boolean forceRenew) throws Exception {
973974
}
974975

975976
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
976-
.setLoginInfo(tenantName, userName, password, database) //
977+
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
977978
.setProperties(getProperties()).setConfigs(getTableConfigs())
978979
.build();
979980
ObTable oldObTable = tableRoster.putIfAbsent(addr, obTable); // not control concurrency
@@ -1393,7 +1394,8 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
13931394
tableEntryAcquireSocketTimeout,
13941395
serverAddressPriorityTimeout,
13951396
serverAddressCachingTimeout,
1396-
sysUA
1397+
sysUA,
1398+
!getServerCapacity().isSupportDistributedExecute() /* withLsId */
13971399
);
13981400
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
13991401
break;
@@ -1829,7 +1831,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, ObTableQuery query,
18291831
}
18301832
}
18311833

1832-
if (partIdMapObTable.size() > 1) {
1834+
if (partIdMapObTable.size() > 1 && !getServerCapacity().isSupportDistributedExecute()) {
18331835
throw new ObTablePartitionConsistentException(
18341836
"query and mutate must be a atomic operation");
18351837
} else if (partIdMapObTable.size() < 1) {
@@ -1944,7 +1946,7 @@ public ObTable addTable(ObServerAddr addr){
19441946
try {
19451947
logger.info("server from response not exist in route cache, server ip {}, port {} , execute add Table.", addr.getIp(), addr.getSvrPort());
19461948
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
1947-
.setLoginInfo(tenantName, userName, password, database) //
1949+
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
19481950
.setProperties(getProperties()).build();
19491951
tableRoster.put(addr, obTable);
19501952
return obTable;
@@ -3696,7 +3698,15 @@ ObTableQueryAndMutateRequest obTableQueryAndMutate(final ObTableOperation operat
36963698
operations.addTableOperation(operation);
36973699

36983700
ObTableQueryAndMutate queryAndMutate = buildObTableQueryAndMutate(obTableQuery, operations);
3699-
3701+
if (runningMode == RunningMode.HBASE) {
3702+
if (operation.getEntity() != null || operation.getEntity().getRowKeySize() != 3) {
3703+
throw new IllegalArgumentException("rowkey size is not 3");
3704+
}
3705+
long ts = (long)operation.getEntity().getRowKeyValue(2).getValue();
3706+
if (ts != -Long.MAX_VALUE) {
3707+
queryAndMutate.setIsUserSpecifiedT(true);
3708+
}
3709+
}
37003710
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(queryAndMutate,
37013711
tableName);
37023712

@@ -3743,7 +3753,9 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
37433753
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
37443754
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
37453755
tableQuery.setEntityType(request.getEntityType());
3746-
return new ObClusterTableQuery(tableQuery).asyncExecuteInternal();
3756+
ObClusterTableQuery clusterTableQuery = new ObClusterTableQuery(tableQuery);
3757+
clusterTableQuery.setAllowDistributeScan(((ObTableQueryAsyncRequest) request).isAllowDistributeScan());
3758+
return clusterTableQuery.asyncExecuteInternal();
37473759
} else if (request instanceof ObTableBatchOperationRequest) {
37483760
ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl(
37493761
request.getTableName(),
@@ -3819,15 +3831,17 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
38193831
}
38203832

38213833
// Check if partIdMapObTable size is greater than 1
3822-
if (partIdMapObTable.size() > 1) {
3834+
boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute();
3835+
if (partIdMapObTable.size() > 1 && !isDistributedExecuteSupported) {
38233836
throw new ObTablePartitionConsistentException(
38243837
"query and mutate must be a atomic operation");
38253838
}
38263839
// Proceed with the operation
38273840
Map.Entry<Long, ObTableParam> entry = partIdMapObTable.entrySet().iterator().next();
38283841
ObTableParam tableParam = entry.getValue();
38293842
request.setTableId(tableParam.getTableId());
3830-
request.setPartitionId(tableParam.getPartitionId());
3843+
long partitionId = isDistributedExecuteSupported ? INVALID_TABLET_ID : tableParam.getPartitionId();
3844+
request.setPartitionId(partitionId);
38313845
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
38323846
ObTable obTable = tableParam.getObTable();
38333847

@@ -4254,6 +4268,14 @@ public enum RunningMode {
42544268
NORMAL, HBASE;
42554269
}
42564270

4271+
private ObTableClientType getClientType(RunningMode runningMode) {
4272+
if (ObGlobal.isDistributedExecSupport()) {
4273+
return runningMode == RunningMode.HBASE ? ObTableClientType.JAVA_HBASE_CLIENT : ObTableClientType.JAVA_TABLE_CLIENT;
4274+
} else {
4275+
return ObTableClientType.JAVA_TABLE_CLIENT;
4276+
}
4277+
}
4278+
42574279
/**
42584280
* Get read consistency.
42594281
* @return read consistency level.
@@ -4302,6 +4324,22 @@ public ObServerRoute getRoute(boolean readonly) {
43024324
}
43034325
}
43044326

4327+
public ObTableServerCapacity getServerCapacity() {
4328+
if (isOdpMode()) {
4329+
if (odpTable == null) {
4330+
throw new IllegalStateException("client is not initialized and obTable is empty");
4331+
}
4332+
return odpTable.getServerCapacity();
4333+
} else {
4334+
if (tableRoster == null || tableRoster.isEmpty()) {
4335+
throw new IllegalStateException("client is not initialized and obTable is empty");
4336+
}
4337+
Iterator<ObTable> iterator = tableRoster.values().iterator();
4338+
ObTable firstObTable = iterator.next();
4339+
return firstObTable.getServerCapacity();
4340+
}
4341+
}
4342+
43054343
public void setOdpAddr(String odpAddr) {
43064344
this.odpAddr = odpAddr;
43074345
}
@@ -4577,4 +4615,17 @@ public byte[][] getHBaseTableEndKeys(String hbaseTableName) throws Exception {
45774615

45784616
return endKeys;
45794617
}
4618+
public static void setRowKeyValue(Mutation mutation, int index, Object value) {
4619+
if (mutation.getRowKeyValues() == null || (index < 0 || mutation.getRowKeyValues().size() <= index)) {
4620+
throw new IllegalArgumentException("rowkey is null or index is out of range");
4621+
}
4622+
((ObObj) mutation.getRowKeyValues().get(index)).setValue(value);
4623+
}
4624+
4625+
public static Object getRowKeyValue(Mutation mutation, int index) {
4626+
if (mutation.getRowKeyValues() == null || (index < 0 || index >= mutation.getRowKeyValues().size())) {
4627+
throw new IllegalArgumentException("rowkey is null or index is out of range");
4628+
}
4629+
return ((ObObj) mutation.getRowKeyValues().get(index)).getValue();
4630+
}
45804631
}

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObConnectionFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040

4141
public class ObConnectionFactory implements ConnectionFactory {
4242

43-
private static final Logger logger = LoggerFactory.getLogger(ObConnectionFactory.class);
43+
private static final Logger logger = LoggerFactory
44+
.getLogger(ObConnectionFactory.class);
4445

4546
private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime
4647
.getRuntime().availableProcessors() + 1,

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ private boolean connect() throws Exception {
147147
private void login() throws Exception {
148148
final long start = System.currentTimeMillis();
149149
ObTableLoginRequest request = new ObTableLoginRequest();
150+
request.setClientType((byte) obTable.getClientType().getValue());
150151
request.setTenantName(obTable.getTenantName());
151152
request.setUserName(obTable.getUserName());
152153
request.setDatabaseName(obTable.getDatabase());
@@ -171,6 +172,7 @@ private void login() throws Exception {
171172
if (result != null && result.getCredential() != null
172173
&& result.getCredential().length() > 0) {
173174
credential = result.getCredential();
175+
obTable.setServerCapacity(result.getServerCapabilities());
174176
tenantId = result.getTenantId();
175177
// Set version if missing
176178
if (ObGlobal.obVsnMajor() == 0) {
@@ -259,7 +261,8 @@ public void reConnectAndLogin(String msg) throws ObTableException {
259261
} catch (ObTableServerConnectException ex) {
260262
throw ex;
261263
} catch (Exception ex) {
262-
throw new ObTableConnectionStatusException("check status failed, cause: " + ex.getMessage(), ex);
264+
throw new ObTableConnectionStatusException("check status failed, cause: "
265+
+ ex.getMessage(), ex);
263266
}
264267
}
265268

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125125
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127-
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]" + response.getMessage());
127+
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
128+
+ response.getMessage());
128129
logger.debug(errMessage);
129130
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130131
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
@@ -142,7 +143,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
142143
if (resultCode.getRcode() != 0
143144
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
144145
String errMessage = TraceUtil.formatTraceMessage(conn, request,
145-
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]" + response.getMessage());
146+
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
147+
+ response.getMessage());
146148
logger.debug(errMessage);
147149
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
148150
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
@@ -198,25 +200,25 @@ private boolean needFetchAll(int errorCode, int pcode) {
198200
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
199201
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
200202
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode
201-
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
202-
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
203+
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
204+
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
203205
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
204206
}
205207

206208
private boolean needFetchPartial(int errorCode) {
207209
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
208-
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
209-
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
210-
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
211-
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
212-
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
213-
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
214-
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
215-
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
216-
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
217-
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
218-
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
219-
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
220-
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
210+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
211+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
212+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
213+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
214+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
215+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
216+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
217+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
218+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
219+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
220+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
221+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
222+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
221223
}
222224
}

src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ public class CheckAndInsUp {
3838
private String tableName;
3939
private ObTableFilter filter;
4040
private InsertOrUpdate insUp;
41-
private boolean checkExists = true;
41+
private boolean checkExists = true;
4242
private boolean rollbackWhenCheckFailed = false;
4343

4444
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists)
4545
throws IllegalArgumentException {
4646
this(null, null, filter, insUp, check_exists, false);
4747
}
4848

49-
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists, boolean rollbackWhenCheckFailed)
50-
throws IllegalArgumentException {
49+
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists,
50+
boolean rollbackWhenCheckFailed) throws IllegalArgumentException {
5151
this(null, null, filter, insUp, check_exists, rollbackWhenCheckFailed);
5252
}
5353

@@ -57,8 +57,9 @@ public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
5757
this(client, tableName, filter, insUp, check_exists, false);
5858
}
5959

60-
public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,InsertOrUpdate insUp,
61-
boolean check_exists, boolean rollbackWhenCheckFailed) throws IllegalArgumentException {
60+
public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
61+
InsertOrUpdate insUp, boolean check_exists, boolean rollbackWhenCheckFailed)
62+
throws IllegalArgumentException {
6263
this.client = client;
6364
this.tableName = tableName;
6465
this.filter = filter;

0 commit comments

Comments
 (0)