Skip to content

Commit 7428b9e

Browse files
committed
add client type
1 parent 259b344 commit 7428b9e

31 files changed

+865
-709
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public static boolean isReturnOneResultSupport() {
9797

9898
public static boolean isHBaseBatchGetSupport() {
9999
return OB_VERSION >= OB_VERSION_4_2_5_2 && OB_VERSION < OB_VERSION_4_3_0_0
100-
|| OB_VERSION >= OB_VERSION_4_3_5_1;
100+
|| OB_VERSION >= OB_VERSION_4_3_5_1;
101101
}
102102

103103
public static boolean isHBaseBatchSupport() {

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ private void initMetadata() throws Exception {
433433
if (odpMode) {
434434
try {
435435
odpTable = new ObTable.Builder(odpAddr, odpPort) //
436-
.setLoginInfo(tenantName, fullUserName, password, database) //
436+
.setLoginInfo(tenantName, fullUserName, password, database, getClientType(runningMode)) //
437437
.setProperties(getProperties()).setConfigs(TableConfigs).build();
438438
} catch (Exception e) {
439439
logger
@@ -486,7 +486,7 @@ private void initMetadata() throws Exception {
486486
// 应急可以直接observer切主
487487
try {
488488
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
489-
.setLoginInfo(tenantName, userName, password, database) //
489+
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
490490
.setProperties(getProperties()).setConfigs(TableConfigs).build();
491491
tableRoster.put(addr, obTable);
492492
servers.add(addr);
@@ -997,7 +997,7 @@ public void syncRefreshMetadata() throws Exception {
997997
}
998998

999999
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
1000-
.setLoginInfo(tenantName, userName, password, database) //
1000+
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
10011001
.setProperties(getProperties()).setConfigs(getTableConfigs())
10021002
.build();
10031003
ObTable oldObTable = tableRoster.putIfAbsent(addr, obTable); // not control concurrency
@@ -1912,7 +1912,7 @@ public ObTable addTable(ObServerAddr addr){
19121912
try {
19131913
logger.info("server from response not exist in route cache, server ip {}, port {} , execute add Table.", addr.getIp(), addr.getSvrPort());
19141914
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
1915-
.setLoginInfo(tenantName, userName, password, database) //
1915+
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
19161916
.setProperties(getProperties()).build();
19171917
tableRoster.put(addr, obTable);
19181918
return obTable;
@@ -4198,6 +4198,10 @@ public enum RunningMode {
41984198
NORMAL, HBASE;
41994199
}
42004200

4201+
private ObTableClientType getClientType(RunningMode runningMode) {
4202+
return runningMode == RunningMode.HBASE ? ObTableClientType.JAVA_HBASE_CLIENT : ObTableClientType.JAVA_TABLE_CLIENT;
4203+
}
4204+
42014205
/**
42024206
* Get read consistency.
42034207
* @return read consistency level.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObConnectionFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040

4141
public class ObConnectionFactory implements ConnectionFactory {
4242

43-
private static final Logger logger = LoggerFactory.getLogger(ObConnectionFactory.class);
43+
private static final Logger logger = LoggerFactory
44+
.getLogger(ObConnectionFactory.class);
4445

4546
private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime
4647
.getRuntime().availableProcessors() + 1,

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ private boolean connect() throws Exception {
147147
private void login() throws Exception {
148148
final long start = System.currentTimeMillis();
149149
ObTableLoginRequest request = new ObTableLoginRequest();
150+
request.setClientType((byte) obTable.getClientType().getValue());
150151
request.setTenantName(obTable.getTenantName());
151152
request.setUserName(obTable.getUserName());
152153
request.setDatabaseName(obTable.getDatabase());
@@ -260,7 +261,8 @@ public void reConnectAndLogin(String msg) throws ObTableException {
260261
} catch (ObTableServerConnectException ex) {
261262
throw ex;
262263
} catch (Exception ex) {
263-
throw new ObTableConnectionStatusException("check status failed, cause: " + ex.getMessage(), ex);
264+
throw new ObTableConnectionStatusException("check status failed, cause: "
265+
+ ex.getMessage(), ex);
264266
}
265267
}
266268

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125125
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127-
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]" + response.getMessage());
127+
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
128+
+ response.getMessage());
128129
logger.debug(errMessage);
129130
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130131
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
@@ -142,7 +143,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
142143
if (resultCode.getRcode() != 0
143144
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
144145
String errMessage = TraceUtil.formatTraceMessage(conn, request,
145-
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]" + response.getMessage());
146+
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
147+
+ response.getMessage());
146148
logger.debug(errMessage);
147149
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
148150
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
@@ -198,25 +200,25 @@ private boolean needFetchAll(int errorCode, int pcode) {
198200
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
199201
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
200202
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode
201-
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
202-
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
203+
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
204+
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
203205
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
204206
}
205207

206208
private boolean needFetchPartial(int errorCode) {
207209
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
208-
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
209-
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
210-
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
211-
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
212-
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
213-
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
214-
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
215-
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
216-
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
217-
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
218-
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
219-
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
220-
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
210+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
211+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
212+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
213+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
214+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
215+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
216+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
217+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
218+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
219+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
220+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
221+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
222+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
221223
}
222224
}

src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ public class CheckAndInsUp {
3838
private String tableName;
3939
private ObTableFilter filter;
4040
private InsertOrUpdate insUp;
41-
private boolean checkExists = true;
41+
private boolean checkExists = true;
4242
private boolean rollbackWhenCheckFailed = false;
4343

4444
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists)
4545
throws IllegalArgumentException {
4646
this(null, null, filter, insUp, check_exists, false);
4747
}
4848

49-
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists, boolean rollbackWhenCheckFailed)
50-
throws IllegalArgumentException {
49+
public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists,
50+
boolean rollbackWhenCheckFailed) throws IllegalArgumentException {
5151
this(null, null, filter, insUp, check_exists, rollbackWhenCheckFailed);
5252
}
5353

@@ -57,8 +57,9 @@ public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
5757
this(client, tableName, filter, insUp, check_exists, false);
5858
}
5959

60-
public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,InsertOrUpdate insUp,
61-
boolean check_exists, boolean rollbackWhenCheckFailed) throws IllegalArgumentException {
60+
public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
61+
InsertOrUpdate insUp, boolean check_exists, boolean rollbackWhenCheckFailed)
62+
throws IllegalArgumentException {
6263
this.client = client;
6364
this.tableName = tableName;
6465
this.filter = filter;

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.alipay.oceanbase.rpc.property.Property;
3333
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
3434
import com.alipay.oceanbase.rpc.table.ObTable;
35+
import com.alipay.oceanbase.rpc.table.ObTableClientType;
3536

3637
public class ObDirectLoadConnection {
3738

@@ -186,8 +187,8 @@ private void initProtocol() throws ObDirectLoadException {
186187
properties.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(),
187188
String.valueOf(connectTimeout));
188189
table = new ObTable.Builder(ip, port)
189-
.setLoginInfo(tenantName, userName, password, databaseName)
190-
.setProperties(properties).build();
190+
.setLoginInfo(tenantName, userName, password, databaseName,
191+
ObTableClientType.JAVA_TABLE_CLIENT).setProperties(properties).build();
191192
} catch (Exception e) {
192193
throw new ObDirectLoadException(e);
193194
}
@@ -397,8 +398,8 @@ private void initTables() throws ObDirectLoadException {
397398
for (int i = 0; i < tables.length; ++i) {
398399
tables[i] = new ObTable.Builder(connection.ip, connection.port)
399400
.setLoginInfo(connection.tenantName, connection.userName,
400-
connection.password, connection.databaseName).setProperties(properties)
401-
.build();
401+
connection.password, connection.databaseName,
402+
ObTableClientType.JAVA_TABLE_CLIENT).setProperties(properties).build();
402403
}
403404
} catch (Exception e) {
404405
throw new ObDirectLoadException(e);

src/main/java/com/alipay/oceanbase/rpc/get/Get.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
118
package com.alipay.oceanbase.rpc.get;
219

320
import com.alipay.oceanbase.rpc.ObTableClient;
@@ -8,10 +25,10 @@
825
import java.util.Map;
926

1027
public class Get {
11-
private Table client;
12-
private String tableName;
13-
protected Row rowKey;
14-
protected String[] selectColumns;
28+
private Table client;
29+
private String tableName;
30+
protected Row rowKey;
31+
protected String[] selectColumns;
1532

1633
public Get() {
1734
tableName = null;
@@ -58,6 +75,6 @@ public Map<String, Object> execute() throws Exception {
5875
if (client == null) {
5976
throw new IllegalArgumentException("client is null");
6077
}
61-
return ((ObTableClient)client).get(tableName, rowKey, selectColumns);
78+
return ((ObTableClient) client).get(tableName, rowKey, selectColumns);
6279
}
6380
}

src/main/java/com/alipay/oceanbase/rpc/get/result/GetResult.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
118
package com.alipay.oceanbase.rpc.get.result;
219

320
import com.alipay.oceanbase.rpc.exception.ObTableException;

0 commit comments

Comments
 (0)