Skip to content

Commit e9fc9d9

Browse files
authored
Background keep-alive mechannic (#361)
* implement background keep alive mechanic in RouteTableRefresher * add log * fix check expired nullpointer, add login-failed ip into suspect list * add suspect ip first and refresh tablet location after * do not add suspect ip in odp mode
1 parent 83802ff commit e9fc9d9

File tree

10 files changed

+302
-45
lines changed

10 files changed

+302
-45
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -581,12 +581,11 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
581581
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
582582
logger.debug("client execute meet transport timeout, obTable ip:port is {}:{}",
583583
tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
584-
syncRefreshMetadata(true);
585584
TableEntry entry = tableRoute.getTableEntry(tableName);
586585
long partId = tableRoute.getPartId(entry, rowKey);
587586
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
588-
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
589587
tableParam.getObTable().setDirty();
588+
dealWithRpcTimeoutForSingleTablet(tableParam.getObTable().getObServerAddr(), tableName, tabletId);
590589
}
591590
calculateContinuousFailure(tableName, ex.getMessage());
592591
throw ex;
@@ -810,12 +809,11 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
810809
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
811810
logger.debug("client execute meet transport timeout, obTable ip:port is {}:{}",
812811
tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
813-
syncRefreshMetadata(true);
814812
TableEntry entry = tableRoute.getTableEntry(tableName);
815813
long partId = tableRoute.getPartId(entry, callback.getRowKey());
816814
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
817-
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
818815
tableParam.getObTable().setDirty();
816+
dealWithRpcTimeoutForSingleTablet(tableParam.getObTable().getObServerAddr(), tableName, tabletId);
819817
}
820818
calculateContinuousFailure(tableName, ex.getMessage());
821819
throw ex;
@@ -1145,7 +1143,7 @@ public ObTable addTable(ObServerAddr addr){
11451143
logger.info("server from response not exist in route cache, server ip {}, port {} , execute add Table.", addr.getIp(), addr.getSvrPort());
11461144
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
11471145
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
1148-
.setProperties(getProperties()).build();
1146+
.setProperties(getProperties()).setObServerAddr(addr).build();
11491147
tableRoster.put(addr, obTable);
11501148
return obTable;
11511149
} catch (Exception e) {
@@ -1183,6 +1181,17 @@ public Row transformToRow(String tableName, Object[] rowkey) throws Exception {
11831181
return row;
11841182
}
11851183

1184+
public void dealWithRpcTimeoutForSingleTablet(ObServerAddr addr, String tableName, long tabletId) throws Exception {
1185+
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(addr);
1186+
RouteTableRefresher.addIntoSuspectIPs(suspectAddr);
1187+
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
1188+
}
1189+
public void dealWithRpcTimeoutForBatchTablet(ObServerAddr addr, String tableName) throws Exception {
1190+
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(addr);
1191+
RouteTableRefresher.addIntoSuspectIPs(suspectAddr);
1192+
tableRoute.refreshTabletLocationBatch(tableName);
1193+
}
1194+
11861195
/**
11871196
* get table name with table group
11881197
* @param tableGroupName table group name

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alipay.oceanbase.rpc.ObGlobal;
2121
import com.alipay.oceanbase.rpc.exception.*;
2222
import com.alipay.oceanbase.rpc.location.LocationUtil;
23+
import com.alipay.oceanbase.rpc.location.model.RouteTableRefresher;
2324
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginRequest;
2425
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
2526
import com.alipay.oceanbase.rpc.table.ObTable;
@@ -115,6 +116,10 @@ private boolean connect() throws Exception {
115116
MONITOR.info(logMessage(null, "CONNECT", endpoint, System.currentTimeMillis() - start));
116117

117118
if (tries >= maxTryTimes) {
119+
if (!obTable.isOdpMode()) {
120+
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(obTable.getObServerAddr());
121+
RouteTableRefresher.addIntoSuspectIPs(suspectAddr);
122+
}
118123
LOGGER.warn("connect failed after max " + maxTryTimes + " tries "
119124
+ TraceUtil.formatIpPort(obTable));
120125
throw new ObTableServerConnectException("connect failed after max " + maxTryTimes
@@ -252,7 +257,7 @@ public void reConnectAndLogin(String msg) throws ObTableException {
252257
try {
253258
// 1. check the connection is available, force to close it
254259
if (checkAvailable()) {
255-
LOGGER.warn("The connection would be closed and reconnected if: "
260+
LOGGER.warn("The connection would be closed and reconnected is: "
256261
+ connection.getUrl());
257262
close();
258263
}

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ public static List<ObServerLdcItem> getServerLdc(ServerRoster serverRoster,
305305
* @param socketTimeout
306306
* @return
307307
*/
308-
private static String formatObServerUrl(ObServerAddr obServerAddr, long connectTimeout,
308+
public static String formatObServerUrl(ObServerAddr obServerAddr, long connectTimeout,
309309
long socketTimeout) {
310310
return format(
311311
"jdbc:mysql://%s/oceanbase?useUnicode=true&characterEncoding=utf-8&connectTimeout=%d&socketTimeout=%d",
@@ -319,7 +319,7 @@ private static String formatObServerUrl(ObServerAddr obServerAddr, long connectT
319319
* @return
320320
* @throws ObTableEntryRefreshException
321321
*/
322-
private static Connection getMetaRefreshConnection(String url, ObUserAuth sysUA)
322+
public static Connection getMetaRefreshConnection(String url, ObUserAuth sysUA)
323323
throws ObTableEntryRefreshException {
324324

325325
try {

src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java

Lines changed: 232 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,70 @@
1616
*/
1717
package com.alipay.oceanbase.rpc.location.model;
1818

19+
import java.sql.Connection;
20+
import java.sql.ResultSet;
21+
import java.sql.SQLException;
22+
import java.sql.Statement;
1923
import java.util.*;
24+
import java.util.concurrent.ConcurrentHashMap;
2025
import java.util.concurrent.Executors;
2126
import java.util.concurrent.ScheduledExecutorService;
2227
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.locks.Lock;
29+
import java.util.concurrent.locks.ReentrantLock;
2330

2431
import com.alipay.oceanbase.rpc.ObTableClient;
32+
import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException;
33+
import com.alipay.oceanbase.rpc.exception.ObTableTryLockTimeoutException;
34+
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
2535
import com.alipay.oceanbase.rpc.location.LocationUtil;
2636
import org.slf4j.Logger;
37+
2738
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.getLogger;
2839

2940
public class RouteTableRefresher {
3041

31-
private static final Logger logger = getLogger(RouteTableRefresher.class);
42+
private static final Logger logger = getLogger(RouteTableRefresher.class);
43+
44+
private static final String sql = "select 'detect server alive' from dual";
45+
46+
private final ObTableClient tableClient;
47+
48+
private final ObUserAuth sysUA;
49+
50+
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
51+
52+
private final static ConcurrentHashMap<ObServerAddr, Lock> suspectLocks = new ConcurrentHashMap<>(); // ObServer -> access lock
3253

33-
private final ObTableClient tableClient;
54+
private final static ConcurrentHashMap<ObServerAddr, SuspectObServer> suspectServers = new ConcurrentHashMap<>(); // ObServer -> information structure
3455

35-
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
56+
private final static HashMap<ObServerAddr, Long> serverLastAccessTimestamps = new HashMap<>(); // ObServer -> last access timestamp
3657

37-
public RouteTableRefresher(ObTableClient tableClient) {
58+
public RouteTableRefresher(ObTableClient tableClient, ObUserAuth sysUA) {
3859
this.tableClient = tableClient;
60+
this.sysUA = sysUA;
61+
}
62+
63+
/**
64+
* check whether observers have changed every 30 seconds
65+
* if changed, refresh in the background
66+
* */
67+
public void start() {
68+
scheduler.scheduleAtFixedRate(this::doRsListCheck, 30, 30, TimeUnit.SECONDS);
69+
scheduler.scheduleWithFixedDelay(this::doCheckAliveTask, 1, 1, TimeUnit.SECONDS);
70+
}
71+
72+
public void close() {
73+
try {
74+
scheduler.shutdown();
75+
// wait at most 1 seconds to close the scheduler
76+
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
77+
scheduler.shutdownNow();
78+
}
79+
} catch (InterruptedException e) {
80+
logger.warn("scheduler await for terminate interrupted: {}.", e.getMessage());
81+
scheduler.shutdownNow();
82+
}
3983
}
4084

4185
/**
@@ -86,25 +130,195 @@ private void doRsListCheck() {
86130
}
87131
}
88132

89-
/**
90-
* check whether observers have changed every 30 seconds
91-
* if changed, refresh in the background
92-
* */
93-
public void start() {
94-
scheduler.scheduleAtFixedRate(this::doRsListCheck, 30, 30, TimeUnit.SECONDS);
133+
private void doCheckAliveTask() {
134+
for (Map.Entry<ObServerAddr, SuspectObServer> entry : suspectServers.entrySet()) {
135+
try {
136+
checkAlive(entry.getKey());
137+
} catch (Exception e) {
138+
// silence resolving
139+
logger.warn("RouteTableRefresher::doCheckAliveTask fail, failed server: {}", entry.getKey().toString());
140+
}
141+
}
95142
}
96143

97-
public void close() {
144+
private void checkAlive(ObServerAddr addr) {
145+
long connectTimeout = 1000L; // 1s
146+
long socketTimeout = 5000L; // 5s
147+
String url = LocationUtil.formatObServerUrl(addr, connectTimeout, socketTimeout);
148+
Connection connection = null;
149+
Statement statement = null;
150+
ResultSet rs = null;
98151
try {
99-
scheduler.shutdown();
100-
// wait at most 1 seconds to close the scheduler
101-
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
102-
scheduler.shutdownNow();
152+
connection = LocationUtil.getMetaRefreshConnection(url, sysUA);
153+
statement = connection.createStatement();
154+
rs = statement.executeQuery(sql);
155+
boolean alive = false;
156+
while (rs.next()) {
157+
String res = rs.getString("detect server alive");
158+
alive = res.equalsIgnoreCase("detect server alive");
159+
}
160+
if (alive) {
161+
removeFromSuspectIPs(addr);
162+
} else {
163+
calcFailureOrClearCache(addr);
164+
}
165+
} catch (Throwable t) {
166+
logger.debug("check alive failed, server: {}", addr.toString(), t);
167+
if (t instanceof SQLException) {
168+
// occurred during query
169+
calcFailureOrClearCache(addr);
170+
} if (t instanceof ObTableEntryRefreshException) {
171+
// occurred during connection construction
172+
ObTableEntryRefreshException e = (ObTableEntryRefreshException) t;
173+
if (e.isConnectInactive()) {
174+
calcFailureOrClearCache(addr);
175+
} else {
176+
logger.warn("background check-alive mechanic meet ObTableEntryRefreshException, server: {}", addr.toString(), t);
177+
removeFromSuspectIPs(addr);
178+
}
179+
} else {
180+
// silence resolving
181+
logger.warn("background check-alive mechanic meet exception, server: {}", addr.toString(), t);
182+
removeFromSuspectIPs(addr);
183+
}
184+
} finally {
185+
try {
186+
if (rs != null) {
187+
rs.close();
188+
}
189+
if (statement != null) {
190+
statement.close();
191+
}
192+
if (connection != null) {
193+
connection.close();
194+
}
195+
} catch (SQLException e) {
196+
// ignore
103197
}
104-
} catch (InterruptedException e) {
105-
logger.warn("scheduler await for terminate interrupted: {}.", e.getMessage());
106-
scheduler.shutdownNow();
107198
}
108199
}
109200

201+
public static void addIntoSuspectIPs(SuspectObServer server) throws Exception {
202+
ObServerAddr addr = server.getAddr();
203+
if (suspectServers.get(addr) != null) {
204+
// already in the list, directly return
205+
return;
206+
}
207+
long addInterval = 20000L; // 20s
208+
Lock tempLock = new ReentrantLock();
209+
Lock lock = suspectLocks.putIfAbsent(addr, tempLock);
210+
lock = (lock == null) ? tempLock : lock;
211+
boolean acquired = false;
212+
try {
213+
int retryTimes = 0;
214+
while (true) {
215+
try {
216+
acquired = lock.tryLock(1, TimeUnit.SECONDS);
217+
if (!acquired) {
218+
throw new ObTableTryLockTimeoutException("try to get suspect server lock timeout, timeout: 1s");
219+
}
220+
if (suspectServers.get(addr) != null) {
221+
// already in the list, directly break
222+
break;
223+
}
224+
Long lastServerAccessTs = serverLastAccessTimestamps.get(addr);
225+
if (lastServerAccessTs != null) {
226+
long interval = System.currentTimeMillis() - lastServerAccessTs;
227+
if (interval < addInterval) {
228+
// do not repeatedly add within 20 seconds since last adding
229+
break;
230+
}
231+
}
232+
logger.debug("add into suspect list, server: {}", addr);
233+
suspectServers.put(addr, server);
234+
serverLastAccessTimestamps.put(addr, server.getAccessTimestamp());
235+
break;
236+
} catch (ObTableTryLockTimeoutException e) {
237+
// if try lock timeout, need to retry
238+
++retryTimes;
239+
logger.warn("wait to try lock to timeout 1s when add observer into suspect ips, server: {}, tryTimes: {}",
240+
addr.toString(), retryTimes, e);
241+
}
242+
} // end while
243+
} finally {
244+
if (acquired) {
245+
lock.unlock();
246+
}
247+
}
248+
}
249+
250+
private void removeFromSuspectIPs(ObServerAddr addr) {
251+
Lock lock = suspectLocks.get(addr);
252+
if (lock == null) {
253+
// lock must have been added before remove
254+
throw new ObTableUnexpectedException(String.format("ObServer [%s:%d] need to be add into suspect ips before remove",
255+
addr.getIp(), addr.getSvrPort()));
256+
}
257+
boolean acquired = false;
258+
try {
259+
int retryTimes = 0;
260+
while (true) {
261+
try {
262+
acquired = lock.tryLock(1, TimeUnit.SECONDS);
263+
if (!acquired) {
264+
throw new ObTableTryLockTimeoutException("try to get suspect server lock timeout, timeout: 1s");
265+
}
266+
// no need to remove lock
267+
suspectServers.remove(addr);
268+
logger.debug("removed server from suspect list: {}", addr);
269+
break;
270+
} catch (ObTableTryLockTimeoutException e) {
271+
// if try lock timeout, need to retry
272+
++retryTimes;
273+
logger.warn("wait to try lock to timeout when add observer into suspect ips, server: {}, tryTimes: {}",
274+
addr.toString(), retryTimes, e);
275+
} catch (InterruptedException e) {
276+
// do not throw exception to user layer
277+
// next background task will continue to remove it
278+
logger.warn("waiting to get lock while interrupted by other threads", e);
279+
}
280+
}
281+
} finally {
282+
if (acquired) {
283+
lock.unlock();
284+
}
285+
}
286+
}
287+
288+
private void calcFailureOrClearCache(ObServerAddr addr) {
289+
int failureLimit = 3;
290+
TableRoute tableRoute = tableClient.getTableRoute();
291+
SuspectObServer server = suspectServers.get(addr);
292+
server.incrementFailure();
293+
int failure = server.getFailure();
294+
if (failure >= failureLimit) {
295+
tableRoute.removeObServer(addr);
296+
removeFromSuspectIPs(addr);
297+
}
298+
logger.debug("background keep-alive mechanic failed to receive response, server: {}, failure: {}",
299+
addr, failure);
300+
}
301+
302+
public static class SuspectObServer {
303+
private final ObServerAddr addr;
304+
private final long accessTimestamp;
305+
private int failure;
306+
public SuspectObServer(ObServerAddr addr) {
307+
this.addr = addr;
308+
accessTimestamp = System.currentTimeMillis();
309+
failure = 0;
310+
}
311+
public ObServerAddr getAddr() {
312+
return this.addr;
313+
}
314+
public long getAccessTimestamp() {
315+
return this.accessTimestamp;
316+
}
317+
public int getFailure() {
318+
return this.failure;
319+
}
320+
public void incrementFailure() {
321+
++failure;
322+
}
323+
}
110324
}

src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public List<ObServerAddr> refreshTablesAndGetNewServers(List<ReplicaLocation> ne
9595

9696
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
9797
.setLoginInfo(tenantName, userName, password, database, clientType) //
98-
.setProperties(properties).setConfigs(tableConfigs).build();
98+
.setProperties(properties).setConfigs(tableConfigs).setObServerAddr(addr).build();
9999
ObTable oldObTable = tables.putIfAbsent(addr, obTable);
100100
logger.warn("add new table addr, {}", addr.toString());
101101
if (oldObTable != null) { // maybe create two ob table concurrently, close current ob table

0 commit comments

Comments
 (0)