Skip to content

Commit 56fd174

Browse files
Support Client Parameters in loginRequest; Periodically Refresh and Reconnect Expired Client Connections (#160)
* Implement JSON string transmission for configuration * Implement JSON string transmission for configuration * expire connection * remove unused import * fix TableConfigs field * Adjust connection timeout configuration * fix: Reduce the frequency of sending config during login * fix login trace id is empty * change client id type to long * add config to control expired connection * add client info view test case * modify default value of MAX_CONN_EXPIRED_TIME * add rpc.execute.timeout in runtime config * add rpc.execute.timeout in runtime config * fix conn relogin invalid when MAX_CONN_EXPIRED_TIME > 2 * remove getConnection lock --------- Co-authored-by: GroundWu <[email protected]>
1 parent de9d2e6 commit 56fd174

File tree

10 files changed

+406
-29
lines changed

10 files changed

+406
-29
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ public class ObTableClient extends AbstractObTableClient implements Lifecycle {
155155
private ConcurrentHashMap<String, String> TableGroupCache = new ConcurrentHashMap<String, String>(); // tableGroup -> Table
156156
private ConcurrentHashMap<String, String> TableGroupInverted = new ConcurrentHashMap<String, String>(); // Table -> tableGroup
157157

158+
private Long clientId;
159+
private Map<String, Object> TableConfigs = new HashMap<>();
158160
/*
159161
* Init.
160162
*/
@@ -167,9 +169,13 @@ public void init() throws Exception {
167169
if (initialized) {
168170
return;
169171
}
170-
// 1.init properties
172+
// 1. init clientId
173+
clientId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
174+
// 2. init table configs map
175+
initTableConfigs();
176+
// 3. init properties
171177
initProperties();
172-
// 2. init metadata
178+
// 4. init metadata
173179
initMetadata();
174180
initialized = true;
175181
} catch (Throwable t) {
@@ -247,6 +253,22 @@ public void checkStatus() throws IllegalStateException {
247253
}
248254
}
249255

256+
public Long getClientId() {
257+
return clientId;
258+
}
259+
260+
public Map<String, Object> getTableConfigs() {
261+
return TableConfigs;
262+
}
263+
264+
private void initTableConfigs() {
265+
TableConfigs.put("client_id", clientId);
266+
TableConfigs.put("runtime", new HashMap<String, String>());
267+
TableConfigs.put("log", new HashMap<String, String>());
268+
TableConfigs.put("route", new HashMap<String, String>());
269+
TableConfigs.put("thread_pool", new HashMap<String, Boolean>());
270+
}
271+
250272
private void initProperties() {
251273
rpcConnectTimeout = parseToInt(RPC_CONNECT_TIMEOUT.getKey(), rpcConnectTimeout);
252274

@@ -328,6 +350,52 @@ private void initProperties() {
328350

329351
slowQueryMonitorThreshold = parseToLong(SLOW_QUERY_MONITOR_THRESHOLD.getKey(),
330352
slowQueryMonitorThreshold);
353+
maxConnExpiredTime = parseToLong(MAX_CONN_EXPIRED_TIME.getKey(), maxConnExpiredTime);
354+
355+
356+
// add configs value to TableConfigs
357+
358+
// runtime
359+
Object value = TableConfigs.get("runtime");
360+
if (value instanceof Map) {
361+
Map<String, String> runtimeMap = (Map<String, String>) value;
362+
runtimeMap.put(RUNTIME_RETRY_TIMES.getKey(), String.valueOf(runtimeRetryTimes));
363+
runtimeMap.put(RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcExecuteTimeout));
364+
runtimeMap.put(RUNTIME_MAX_WAIT.getKey(), String.valueOf(runtimeMaxWait));
365+
runtimeMap.put(RUNTIME_RETRY_INTERVAL.getKey(), String.valueOf(runtimeRetryInterval));
366+
runtimeMap.put(RUNTIME_RETRY_TIMES.getKey(), String.valueOf(runtimeRetryTimes));
367+
runtimeMap.put(MAX_CONN_EXPIRED_TIME.getKey(), String.valueOf(maxConnExpiredTime));
368+
}
369+
// log
370+
value = TableConfigs.get("log");
371+
if (value instanceof Map) {
372+
Map<String, String> logMap = (Map<String, String>) value;
373+
logMap.put(SLOW_QUERY_MONITOR_THRESHOLD.getKey(), String.valueOf(slowQueryMonitorThreshold));
374+
}
375+
376+
value = TableConfigs.get("route");
377+
if (value instanceof Map) {
378+
Map<String, String> routeMap = (Map<String, String>) value;
379+
routeMap.put(METADATA_REFRESH_INTERVAL.getKey(), String.valueOf(metadataRefreshInterval));
380+
routeMap.put(RUNTIME_CONTINUOUS_FAILURE_CEILING.getKey(), String.valueOf(runtimeContinuousFailureCeiling));
381+
routeMap.put(SERVER_ADDRESS_CACHING_TIMEOUT.getKey(), String.valueOf(serverAddressCachingTimeout));
382+
routeMap.put(SERVER_ADDRESS_PRIORITY_TIMEOUT.getKey(), String.valueOf(serverAddressPriorityTimeout));
383+
routeMap.put(TABLE_ENTRY_ACQUIRE_CONNECT_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireConnectTimeout));
384+
routeMap.put(TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireSocketTimeout));
385+
routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_BASE.getKey(), String.valueOf(tableEntryRefreshIntervalBase));
386+
routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_CEILING.getKey(), String.valueOf(tableEntryRefreshIntervalCeiling));
387+
routeMap.put(TABLE_ENTRY_REFRESH_TRY_TIMES.getKey(), String.valueOf(tableEntryRefreshTryTimes));
388+
}
389+
Boolean useExecutor = false;
390+
if (runtimeBatchExecutor != null) {
391+
useExecutor = true;
392+
}
393+
394+
value = TableConfigs.get("thread_pool");
395+
if (value instanceof Map) {
396+
Map<String, Boolean> threadPoolMap = (Map<String, Boolean>) value;
397+
threadPoolMap.put(RUNTIME_BATCH_EXECUTOR.getKey(), useExecutor);
398+
}
331399
}
332400

333401
private void initMetadata() throws Exception {
@@ -337,7 +405,7 @@ private void initMetadata() throws Exception {
337405
try {
338406
odpTable = new ObTable.Builder(odpAddr, odpPort) //
339407
.setLoginInfo(tenantName, fullUserName, password, database) //
340-
.setProperties(getProperties()).build();
408+
.setProperties(getProperties()).setConfigs(TableConfigs).build();
341409
} catch (Exception e) {
342410
logger
343411
.warn(
@@ -390,7 +458,7 @@ private void initMetadata() throws Exception {
390458
try {
391459
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
392460
.setLoginInfo(tenantName, userName, password, database) //
393-
.setProperties(getProperties()).build();
461+
.setProperties(getProperties()).setConfigs(TableConfigs).build();
394462
tableRoster.put(addr, obTable);
395463
servers.add(addr);
396464
} catch (Exception e) {

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alipay.oceanbase.rpc.bolt.transport;
1919

20+
import com.alibaba.fastjson.JSONObject;
2021
import com.alipay.oceanbase.rpc.ObGlobal;
2122
import com.alipay.oceanbase.rpc.exception.*;
2223
import com.alipay.oceanbase.rpc.location.LocationUtil;
@@ -28,6 +29,7 @@
2829
import org.slf4j.Logger;
2930

3031
import java.net.ConnectException;
32+
import java.time.LocalDateTime;
3133
import java.util.concurrent.atomic.AtomicBoolean;
3234
import java.util.concurrent.atomic.AtomicLong;
3335

@@ -45,13 +47,32 @@ public class ObTableConnection {
4547
private long uniqueId; // as trace0 in rpc header
4648
private AtomicLong sequence; // as trace1 in rpc header
4749
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
48-
50+
private AtomicBoolean isExpired = new AtomicBoolean(false);
51+
private LocalDateTime lastConnectionTime;
52+
private boolean loginWithConfigs = false;
4953
public static long ipToLong(String strIp) {
5054
String[] ip = strIp.split("\\.");
5155
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
5256
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
5357
}
5458

59+
public boolean checkExpired() {
60+
long maxConnectionTimes = obTable.getConnMaxExpiredTime();
61+
return lastConnectionTime.isBefore(LocalDateTime.now().minusMinutes(maxConnectionTimes));
62+
}
63+
64+
public boolean isExpired() {
65+
return isExpired.get();
66+
}
67+
68+
public void setExpired(boolean expired) {
69+
isExpired.set(expired);
70+
}
71+
72+
73+
public void enableLoginWithConfigs() {
74+
loginWithConfigs = true;
75+
}
5576
/*
5677
* Ob table connection.
5778
*/
@@ -67,18 +88,6 @@ public void init() throws Exception {
6788
// sequence is a monotone increasing long value inside each connection
6889
sequence = new AtomicLong();
6990
connect();
70-
/* layout of uniqueId(64 bytes)
71-
* ip_: 32
72-
* port_: 16;
73-
* is_user_request_: 1;
74-
* is_ipv6_:1;
75-
* reserved_: 14;
76-
*/
77-
long ip = ipToLong(connection.getLocalIP());
78-
long port = (long) connection.getLocalPort() << 32;
79-
long isUserRequest = (1l << (32 + 16));
80-
long reserved = 0;
81-
uniqueId = ip | port | isUserRequest | reserved;
8291
}
8392

8493
private boolean connect() throws Exception {
@@ -113,7 +122,20 @@ private boolean connect() throws Exception {
113122

114123
// login the server. If login failed, close the raw connection to make the connection creation atomic.
115124
try {
125+
/* layout of uniqueId(64 bytes)
126+
* ip_: 32
127+
* port_: 16;
128+
* is_user_request_: 1;
129+
* is_ipv6_:1;
130+
* reserved_: 14;
131+
*/
132+
long ip = ipToLong(connection.getLocalIP());
133+
long port = (long) connection.getLocalPort() << 32;
134+
long isUserRequest = (1l << (32 + 16));
135+
long reserved = 0;
136+
uniqueId = ip | port | isUserRequest | reserved;
116137
login();
138+
lastConnectionTime = LocalDateTime.now();
117139
} catch (Exception e) {
118140
close();
119141
throw e;
@@ -127,6 +149,11 @@ private void login() throws Exception {
127149
request.setTenantName(obTable.getTenantName());
128150
request.setUserName(obTable.getUserName());
129151
request.setDatabaseName(obTable.getDatabase());
152+
if (loginWithConfigs) {
153+
JSONObject json = new JSONObject(obTable.getConfigs());
154+
request.setConfigsStr(json.toJSONString());
155+
loginWithConfigs = false;
156+
}
130157
generatePassSecret(request);
131158
ObTableLoginResult result;
132159

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ public enum Property {
139139
/*
140140
* other config
141141
*/
142-
RUNTIME_BATCH_EXECUTOR("runtime.batch.executor", null, "批量请求时并发执行的线程池");
142+
RUNTIME_BATCH_EXECUTOR("runtime.batch.executor", null, "批量请求时并发执行的线程池"),
143+
144+
MAX_CONN_EXPIRED_TIME("connection.max.expired.time", 8L, "客户端连接最大过期时间(min)");
143145

144146
private final String key;
145147
private final Object defaultV;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public abstract class ObTableAbstractOperationRequest extends AbstractPayload im
3434
protected String tableName; // table name. 待访问的表名
3535
protected long tableId = Constants.OB_INVALID_ID; // table id. 如果知道表id,可以用于优化,如果不知道,设定为OB_INVALID_ID
3636
protected long partitionId = Constants.INVALID_TABLET_ID; // Constants.OB_INVALID_ID; // partition id / tabletId. 如果知道表分区id,可以用于优化,如果不知道,设定为OB_INVALID_ID
37-
protected ObTableEntityType entityType = ObTableEntityType.KV ; // entity type. 如果明确entity类型,可以用于优化,如果不知道,设定为ObTableEntityType::DYNAMIC
37+
protected ObTableEntityType entityType = ObTableEntityType.KV; // entity type. 如果明确entity类型,可以用于优化,如果不知道,设定为ObTableEntityType::DYNAMIC
3838
protected ObTableConsistencyLevel consistencyLevel = ObTableConsistencyLevel.STRONG; // read consistency level. 读一致性,是否要强一致性等(必须读到刚写入的数据). 目前只支持STRONG.
3939
protected ObTableOptionFlag option_flag = ObTableOptionFlag.DEFAULT;
4040
protected boolean returningAffectedEntity = false;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public int getPcode() {
5353
private ObBytesString passScramble; // 20 字节随机字符串
5454
private String databaseName;
5555
private long ttlUs;
56+
private String configsStr;
5657

5758
/*
5859
* Ob table login request.
@@ -112,7 +113,9 @@ public byte[] encode() {
112113

113114
len = Serialization.getNeedBytes(ttlUs);
114115
System.arraycopy(Serialization.encodeVi64(ttlUs), 0, bytes, idx, len);
115-
116+
idx += len;
117+
strbytes = Serialization.encodeVString(configsStr);
118+
System.arraycopy(strbytes, 0, bytes, idx, strbytes.length);
116119
return bytes;
117120
}
118121

@@ -164,7 +167,8 @@ public long getPayloadContentSize() {
164167
+ Serialization.getNeedBytes(reversed3) + Serialization.getNeedBytes(tenantName)
165168
+ Serialization.getNeedBytes(userName) + Serialization.getNeedBytes(passSecret)
166169
+ Serialization.getNeedBytes(passScramble)
167-
+ Serialization.getNeedBytes(databaseName) + Serialization.getNeedBytes(ttlUs);
170+
+ Serialization.getNeedBytes(databaseName) + Serialization.getNeedBytes(ttlUs)
171+
+ Serialization.getNeedBytes(configsStr);
168172
}
169173

170174
/*
@@ -369,5 +373,13 @@ public long getTtlUs() {
369373
public void setTtlUs(long ttlUs) {
370374
this.ttlUs = ttlUs;
371375
}
376+
377+
public String getConfigsStr() {
378+
return configsStr;
379+
}
380+
381+
public void setConfigsStr(String configsStr) {
382+
this.configsStr = configsStr;
383+
}
372384

373385
}

src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public abstract class AbstractObTable extends AbstractTable {
4141

4242
protected int nettyBlockingWaitInterval = NETTY_BLOCKING_WAIT_INTERVAL.getDefaultInt();
4343

44+
protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();
45+
4446
/*
4547
* Get ob table connect try times.
4648
*/
@@ -159,4 +161,9 @@ public int getNettyBufferHighWatermark() {
159161
public int getNettyBlockingWaitInterval() {
160162
return nettyBlockingWaitInterval;
161163
}
164+
165+
/*
166+
* Get connection max expired time
167+
*/
168+
public long getConnMaxExpiredTime() { return maxConnExpiredTime; }
162169
}

src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public abstract class AbstractObTableClient extends AbstractTable {
9494
.getDefaultInt();
9595
protected long slowQueryMonitorThreshold = SLOW_QUERY_MONITOR_THRESHOLD
9696
.getDefaultLong();
97+
protected Long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();
9798

9899
@Deprecated
99100
/*

0 commit comments

Comments
 (0)