Skip to content

Commit 29f1623

Browse files
authored
Merge branch 'master' into get_partition_meta
2 parents 7ba0c39 + 72ceac7 commit 29f1623

23 files changed

+809
-67
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alipay.oceanbase.rpc;
1919

20+
import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
2021
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
2122
import com.alipay.oceanbase.rpc.table.AbstractTableBatchOps;
2223
import com.alipay.oceanbase.rpc.table.ObTableClientBatchOpsImpl;
@@ -190,7 +191,10 @@ void preCheck() {
190191
throw new IllegalArgumentException("operations is empty");
191192
}
192193
ObTableOperationType lastType = operations.get(0).getOperationType();
193-
if (returnOneResult
194+
if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) {
195+
throw new FeatureNotSupportedException(
196+
"returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]");
197+
} else if (returnOneResult
194198
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
195199
|| lastType == ObTableOperationType.PUT
196200
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,17 @@ public static boolean isLsOpSupport() {
8585
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0;
8686
}
8787

88+
public static boolean isReturnOneResultSupport() {
89+
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0 || OB_VERSION >= OB_VERSION_4_3_4_0;
90+
}
91+
8892
public static final long OB_VERSION_4_2_1_0 = calcVersion(4, (short) 2, (byte) 1, (byte) 0);
8993

9094
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
9195

9296
public static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0);
9397

98+
public static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0);
99+
94100
public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
95101
}

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 115 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ public class ObTableClient extends AbstractObTableClient implements Lifecycle {
163163
private ConcurrentHashMap<String, String> TableGroupCache = new ConcurrentHashMap<String, String>(); // tableGroup -> Table
164164
private ConcurrentHashMap<String, String> TableGroupInverted = new ConcurrentHashMap<String, String>(); // Table -> tableGroup
165165

166+
private Long clientId;
167+
private Map<String, Object> TableConfigs = new HashMap<>();
166168
/*
167169
* Init.
168170
*/
@@ -175,9 +177,13 @@ public void init() throws Exception {
175177
if (initialized) {
176178
return;
177179
}
178-
// 1.init properties
180+
// 1. init clientId
181+
clientId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
182+
// 2. init table configs map
183+
initTableConfigs();
184+
// 3. init properties
179185
initProperties();
180-
// 2. init metadata
186+
// 4. init metadata
181187
initMetadata();
182188
initialized = true;
183189
} catch (Throwable t) {
@@ -255,6 +261,22 @@ public void checkStatus() throws IllegalStateException {
255261
}
256262
}
257263

264+
public Long getClientId() {
265+
return clientId;
266+
}
267+
268+
public Map<String, Object> getTableConfigs() {
269+
return TableConfigs;
270+
}
271+
272+
private void initTableConfigs() {
273+
TableConfigs.put("client_id", clientId);
274+
TableConfigs.put("runtime", new HashMap<String, String>());
275+
TableConfigs.put("log", new HashMap<String, String>());
276+
TableConfigs.put("route", new HashMap<String, String>());
277+
TableConfigs.put("thread_pool", new HashMap<String, Boolean>());
278+
}
279+
258280
private void initProperties() {
259281
rpcConnectTimeout = parseToInt(RPC_CONNECT_TIMEOUT.getKey(), rpcConnectTimeout);
260282

@@ -336,6 +358,52 @@ private void initProperties() {
336358

337359
slowQueryMonitorThreshold = parseToLong(SLOW_QUERY_MONITOR_THRESHOLD.getKey(),
338360
slowQueryMonitorThreshold);
361+
maxConnExpiredTime = parseToLong(MAX_CONN_EXPIRED_TIME.getKey(), maxConnExpiredTime);
362+
363+
364+
// add configs value to TableConfigs
365+
366+
// runtime
367+
Object value = TableConfigs.get("runtime");
368+
if (value instanceof Map) {
369+
Map<String, String> runtimeMap = (Map<String, String>) value;
370+
runtimeMap.put(RUNTIME_RETRY_TIMES.getKey(), String.valueOf(runtimeRetryTimes));
371+
runtimeMap.put(RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcExecuteTimeout));
372+
runtimeMap.put(RUNTIME_MAX_WAIT.getKey(), String.valueOf(runtimeMaxWait));
373+
runtimeMap.put(RUNTIME_RETRY_INTERVAL.getKey(), String.valueOf(runtimeRetryInterval));
374+
runtimeMap.put(RUNTIME_RETRY_TIMES.getKey(), String.valueOf(runtimeRetryTimes));
375+
runtimeMap.put(MAX_CONN_EXPIRED_TIME.getKey(), String.valueOf(maxConnExpiredTime));
376+
}
377+
// log
378+
value = TableConfigs.get("log");
379+
if (value instanceof Map) {
380+
Map<String, String> logMap = (Map<String, String>) value;
381+
logMap.put(SLOW_QUERY_MONITOR_THRESHOLD.getKey(), String.valueOf(slowQueryMonitorThreshold));
382+
}
383+
384+
value = TableConfigs.get("route");
385+
if (value instanceof Map) {
386+
Map<String, String> routeMap = (Map<String, String>) value;
387+
routeMap.put(METADATA_REFRESH_INTERVAL.getKey(), String.valueOf(metadataRefreshInterval));
388+
routeMap.put(RUNTIME_CONTINUOUS_FAILURE_CEILING.getKey(), String.valueOf(runtimeContinuousFailureCeiling));
389+
routeMap.put(SERVER_ADDRESS_CACHING_TIMEOUT.getKey(), String.valueOf(serverAddressCachingTimeout));
390+
routeMap.put(SERVER_ADDRESS_PRIORITY_TIMEOUT.getKey(), String.valueOf(serverAddressPriorityTimeout));
391+
routeMap.put(TABLE_ENTRY_ACQUIRE_CONNECT_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireConnectTimeout));
392+
routeMap.put(TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireSocketTimeout));
393+
routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_BASE.getKey(), String.valueOf(tableEntryRefreshIntervalBase));
394+
routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_CEILING.getKey(), String.valueOf(tableEntryRefreshIntervalCeiling));
395+
routeMap.put(TABLE_ENTRY_REFRESH_TRY_TIMES.getKey(), String.valueOf(tableEntryRefreshTryTimes));
396+
}
397+
Boolean useExecutor = false;
398+
if (runtimeBatchExecutor != null) {
399+
useExecutor = true;
400+
}
401+
402+
value = TableConfigs.get("thread_pool");
403+
if (value instanceof Map) {
404+
Map<String, Boolean> threadPoolMap = (Map<String, Boolean>) value;
405+
threadPoolMap.put(RUNTIME_BATCH_EXECUTOR.getKey(), useExecutor);
406+
}
339407
}
340408

341409
private void initMetadata() throws Exception {
@@ -345,7 +413,7 @@ private void initMetadata() throws Exception {
345413
try {
346414
odpTable = new ObTable.Builder(odpAddr, odpPort) //
347415
.setLoginInfo(tenantName, fullUserName, password, database) //
348-
.setProperties(getProperties()).build();
416+
.setProperties(getProperties()).setConfigs(TableConfigs).build();
349417
} catch (Exception e) {
350418
logger
351419
.warn(
@@ -398,7 +466,7 @@ private void initMetadata() throws Exception {
398466
try {
399467
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
400468
.setLoginInfo(tenantName, userName, password, database) //
401-
.setProperties(getProperties()).build();
469+
.setProperties(getProperties()).setConfigs(TableConfigs).build();
402470
tableRoster.put(addr, obTable);
403471
servers.add(addr);
404472
} catch (Exception e) {
@@ -529,6 +597,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
529597
}
530598
boolean needRefreshTableEntry = false;
531599
boolean needRenew = false;
600+
boolean needFetchAllRouteInfo = false;
532601
int tryTimes = 0;
533602
long startExecute = System.currentTimeMillis();
534603
while (true) {
@@ -547,8 +616,8 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
547616
if (odpMode) {
548617
obPair = getODPTableWithRowKeyValue(tableName, callback.getRowKey(), needRenew);
549618
} else {
550-
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry,
551-
tableEntryRefreshIntervalWait, route);
619+
obPair = getTableBySingleRowKeyWithRoute(tableName, callback.getRowKey(),
620+
needRefreshTableEntry, tableEntryRefreshIntervalWait, needFetchAllRouteInfo, route);
552621
}
553622
T t = callback.execute(obPair);
554623
resetExecuteContinuousFailureCount(tableName);
@@ -606,6 +675,11 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
606675
"retry while meet Exception needing refresh, errorCode: {} , errorMsg: {},retry times {}",
607676
((ObTableException) ex).getErrorCode(), ex.getMessage(),
608677
tryTimes);
678+
if (ex instanceof ObTableNeedFetchAllException) {
679+
needFetchAllRouteInfo = true;
680+
// reset failure count while fetch all route info
681+
this.resetExecuteContinuousFailureCount(tableName);
682+
}
609683
} else {
610684
calculateContinuousFailure(tableName, ex.getMessage());
611685
throw ex;
@@ -694,6 +768,7 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
694768
}
695769
boolean needRefreshTableEntry = false;
696770
boolean needRenew = false;
771+
boolean needFetchAllRouteInfo = false;
697772
int tryTimes = 0;
698773
long startExecute = System.currentTimeMillis();
699774
while (true) {
@@ -718,8 +793,7 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
718793
tableEntryRefreshIntervalWait, route);
719794
} else if (null != callback.getKeyRanges()) {
720795
// using scan range
721-
obPair = getTable(tableName, new ObTableQuery(), callback.getKeyRanges(),
722-
needRefreshTableEntry, tableEntryRefreshIntervalWait, route);
796+
obPair = getTableByRowKeyRange(tableName, new ObTableQuery(), callback.getKeyRanges());
723797
} else {
724798
throw new ObTableException("rowkey and scan range are null in mutation");
725799
}
@@ -781,6 +855,11 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
781855
"retry while meet Exception needing refresh, errorCode: {} , errorMsg: {},retry times {}",
782856
((ObTableException) ex).getErrorCode(), ex.getMessage(),
783857
tryTimes);
858+
if (ex instanceof ObTableNeedFetchAllException) {
859+
needFetchAllRouteInfo = true;
860+
// reset failure count while fetch all route info
861+
this.resetExecuteContinuousFailureCount(tableName);
862+
}
784863
} else {
785864
calculateContinuousFailure(tableName, ex.getMessage());
786865
throw ex;
@@ -1060,6 +1139,19 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexTableName, boolean fo
10601139
}
10611140
}
10621141

1142+
/**
1143+
* Get or refresh table entry.
1144+
* @param tableName table name
1145+
* @param refresh is re-fresh
1146+
* @param waitForRefresh wait re-fresh
1147+
* @return this
1148+
* @throws Exception if fail
1149+
*/
1150+
public TableEntry getOrRefreshTableEntry(final String tableName, final boolean refresh,
1151+
final boolean waitForRefresh) throws Exception {
1152+
return getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
1153+
}
1154+
10631155
/**
10641156
* Get or refresh table entry.
10651157
* @param tableName table name
@@ -1455,13 +1547,15 @@ private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
14551547
* @param rowKey row key
14561548
* @param refresh whether to refresh
14571549
* @param waitForRefresh whether wait for refresh
1550+
* @param needFetchAll whether need fetch all
14581551
* @return ObPair of partId and table
14591552
* @throws Exception exception
14601553
*/
1461-
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
1462-
boolean waitForRefresh) throws Exception {
1554+
public ObPair<Long, ObTableParam> getTableBySingleRowKey(String tableName, Object[] rowKey,
1555+
boolean refresh, boolean waitForRefresh, boolean needFetchAll)
1556+
throws Exception {
14631557
ObServerRoute route = getRoute(false);
1464-
return getTable(tableName, rowKey, refresh, waitForRefresh, route);
1558+
return getTableBySingleRowKeyWithRoute(tableName, rowKey, refresh, waitForRefresh, needFetchAll, route);
14651559
}
14661560

14671561
/**
@@ -1474,10 +1568,14 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
14741568
* @return ObPair of partId and table
14751569
* @throws Exception exception
14761570
*/
1477-
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
1478-
boolean waitForRefresh, ObServerRoute route)
1479-
throws Exception {
1480-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
1571+
public ObPair<Long, ObTableParam> getTableBySingleRowKeyWithRoute(String tableName,
1572+
Object[] rowKey,
1573+
boolean refresh,
1574+
boolean waitForRefresh,
1575+
boolean needFetchAll,
1576+
ObServerRoute route)
1577+
throws Exception {
1578+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, needFetchAll);
14811579
Row row = new Row();
14821580
if (tableEntry.isPartitionTable()
14831581
&& tableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_ZERO) {
@@ -1547,15 +1645,11 @@ public ObPair<Long, ObTableParam> getODPTableWithRowKeyValue(String tableName, O
15471645
* @param tableName table want to get
15481646
* @param query query
15491647
* @param keyRanges key
1550-
* @param refresh whether to refresh
1551-
* @param waitForRefresh whether wait for refresh
1552-
* @param route ObServer route
15531648
* @return ObPair of partId and table
15541649
* @throws Exception exception
15551650
*/
1556-
public ObPair<Long, ObTableParam> getTable(String tableName, ObTableQuery query, List<ObNewRange> keyRanges, boolean refresh,
1557-
boolean waitForRefresh, ObServerRoute route)
1558-
throws Exception {
1651+
public ObPair<Long, ObTableParam> getTableByRowKeyRange(String tableName, ObTableQuery query, List<ObNewRange> keyRanges)
1652+
throws Exception {
15591653
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
15601654
for (ObNewRange rang : keyRanges) {
15611655
ObRowKey startKey = rang.getStartKey();

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alipay.oceanbase.rpc.bolt.transport;
1919

20+
import com.alibaba.fastjson.JSONObject;
2021
import com.alipay.oceanbase.rpc.ObGlobal;
2122
import com.alipay.oceanbase.rpc.exception.*;
2223
import com.alipay.oceanbase.rpc.location.LocationUtil;
@@ -28,6 +29,7 @@
2829
import org.slf4j.Logger;
2930

3031
import java.net.ConnectException;
32+
import java.time.LocalDateTime;
3133
import java.util.concurrent.atomic.AtomicBoolean;
3234
import java.util.concurrent.atomic.AtomicLong;
3335

@@ -45,13 +47,32 @@ public class ObTableConnection {
4547
private long uniqueId; // as trace0 in rpc header
4648
private AtomicLong sequence; // as trace1 in rpc header
4749
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
48-
50+
private AtomicBoolean isExpired = new AtomicBoolean(false);
51+
private LocalDateTime lastConnectionTime;
52+
private boolean loginWithConfigs = false;
4953
public static long ipToLong(String strIp) {
5054
String[] ip = strIp.split("\\.");
5155
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
5256
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
5357
}
5458

59+
public boolean checkExpired() {
60+
long maxConnectionTimes = obTable.getConnMaxExpiredTime();
61+
return lastConnectionTime.isBefore(LocalDateTime.now().minusMinutes(maxConnectionTimes));
62+
}
63+
64+
public boolean isExpired() {
65+
return isExpired.get();
66+
}
67+
68+
public void setExpired(boolean expired) {
69+
isExpired.set(expired);
70+
}
71+
72+
73+
public void enableLoginWithConfigs() {
74+
loginWithConfigs = true;
75+
}
5576
/*
5677
* Ob table connection.
5778
*/
@@ -67,18 +88,6 @@ public void init() throws Exception {
6788
// sequence is a monotone increasing long value inside each connection
6889
sequence = new AtomicLong();
6990
connect();
70-
/* layout of uniqueId(64 bytes)
71-
* ip_: 32
72-
* port_: 16;
73-
* is_user_request_: 1;
74-
* is_ipv6_:1;
75-
* reserved_: 14;
76-
*/
77-
long ip = ipToLong(connection.getLocalIP());
78-
long port = (long) connection.getLocalPort() << 32;
79-
long isUserRequest = (1l << (32 + 16));
80-
long reserved = 0;
81-
uniqueId = ip | port | isUserRequest | reserved;
8291
}
8392

8493
private boolean connect() throws Exception {
@@ -113,7 +122,20 @@ private boolean connect() throws Exception {
113122

114123
// login the server. If login failed, close the raw connection to make the connection creation atomic.
115124
try {
125+
/* layout of uniqueId(64 bytes)
126+
* ip_: 32
127+
* port_: 16;
128+
* is_user_request_: 1;
129+
* is_ipv6_:1;
130+
* reserved_: 14;
131+
*/
132+
long ip = ipToLong(connection.getLocalIP());
133+
long port = (long) connection.getLocalPort() << 32;
134+
long isUserRequest = (1l << (32 + 16));
135+
long reserved = 0;
136+
uniqueId = ip | port | isUserRequest | reserved;
116137
login();
138+
lastConnectionTime = LocalDateTime.now();
117139
} catch (Exception e) {
118140
close();
119141
throw e;
@@ -127,6 +149,11 @@ private void login() throws Exception {
127149
request.setTenantName(obTable.getTenantName());
128150
request.setUserName(obTable.getUserName());
129151
request.setDatabaseName(obTable.getDatabase());
152+
if (loginWithConfigs) {
153+
JSONObject json = new JSONObject(obTable.getConfigs());
154+
request.setConfigsStr(json.toJSONString());
155+
loginWithConfigs = false;
156+
}
130157
generatePassSecret(request);
131158
ObTableLoginResult result;
132159

0 commit comments

Comments
 (0)