Skip to content

Commit dcda8a7

Browse files
committed
implement background keep alive mechanic in RouteTableRefresher
1 parent 4b03b22 commit dcda8a7

File tree

9 files changed

+262
-43
lines changed

9 files changed

+262
-43
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -581,12 +581,11 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
581581
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
582582
logger.debug("client execute meet transport timeout, obTable ip:port is {}:{}",
583583
tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
584-
syncRefreshMetadata(true);
585584
TableEntry entry = tableRoute.getTableEntry(tableName);
586585
long partId = tableRoute.getPartId(entry, rowKey);
587586
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
588-
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
589587
tableParam.getObTable().setDirty();
588+
dealWithRpcTimeoutForSingleTablet(tableParam.getObTable().getObServerAddr(), tableName, tabletId);
590589
}
591590
calculateContinuousFailure(tableName, ex.getMessage());
592591
throw ex;
@@ -810,12 +809,11 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
810809
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
811810
logger.debug("client execute meet transport timeout, obTable ip:port is {}:{}",
812811
tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
813-
syncRefreshMetadata(true);
814812
TableEntry entry = tableRoute.getTableEntry(tableName);
815813
long partId = tableRoute.getPartId(entry, callback.getRowKey());
816814
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
817-
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
818815
tableParam.getObTable().setDirty();
816+
dealWithRpcTimeoutForSingleTablet(tableParam.getObTable().getObServerAddr(), tableName, tabletId);
819817
}
820818
calculateContinuousFailure(tableName, ex.getMessage());
821819
throw ex;
@@ -1145,7 +1143,7 @@ public ObTable addTable(ObServerAddr addr){
11451143
logger.info("server from response not exist in route cache, server ip {}, port {} , execute add Table.", addr.getIp(), addr.getSvrPort());
11461144
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
11471145
.setLoginInfo(tenantName, userName, password, database, getClientType(runningMode)) //
1148-
.setProperties(getProperties()).build();
1146+
.setProperties(getProperties()).setObServerAddr(addr).build();
11491147
tableRoster.put(addr, obTable);
11501148
return obTable;
11511149
} catch (Exception e) {
@@ -1183,6 +1181,17 @@ public Row transformToRow(String tableName, Object[] rowkey) throws Exception {
11831181
return row;
11841182
}
11851183

1184+
public void dealWithRpcTimeoutForSingleTablet(ObServerAddr addr, String tableName, long tabletId) throws Exception {
1185+
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
1186+
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(addr);
1187+
tableRoute.addIntoSuspectIPs(suspectAddr);
1188+
}
1189+
public void dealWithRpcTimeoutForBatchTablet(ObServerAddr addr, String tableName) throws Exception {
1190+
tableRoute.refreshTabletLocationBatch(tableName);
1191+
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(addr);
1192+
tableRoute.addIntoSuspectIPs(suspectAddr);
1193+
}
1194+
11861195
/**
11871196
* get table name with table group
11881197
* @param tableGroupName table group name

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ public static List<ObServerLdcItem> getServerLdc(ServerRoster serverRoster,
305305
* @param socketTimeout
306306
* @return
307307
*/
308-
private static String formatObServerUrl(ObServerAddr obServerAddr, long connectTimeout,
308+
public static String formatObServerUrl(ObServerAddr obServerAddr, long connectTimeout,
309309
long socketTimeout) {
310310
return format(
311311
"jdbc:mysql://%s/oceanbase?useUnicode=true&characterEncoding=utf-8&connectTimeout=%d&socketTimeout=%d",
@@ -319,7 +319,7 @@ private static String formatObServerUrl(ObServerAddr obServerAddr, long connectT
319319
* @return
320320
* @throws ObTableEntryRefreshException
321321
*/
322-
private static Connection getMetaRefreshConnection(String url, ObUserAuth sysUA)
322+
public static Connection getMetaRefreshConnection(String url, ObUserAuth sysUA)
323323
throws ObTableEntryRefreshException {
324324

325325
try {

src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java

Lines changed: 199 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,68 @@
1616
*/
1717
package com.alipay.oceanbase.rpc.location.model;
1818

19+
import java.sql.Connection;
20+
import java.sql.SQLException;
21+
import java.sql.Statement;
1922
import java.util.*;
23+
import java.util.concurrent.ConcurrentHashMap;
2024
import java.util.concurrent.Executors;
2125
import java.util.concurrent.ScheduledExecutorService;
2226
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.locks.Lock;
28+
import java.util.concurrent.locks.ReentrantLock;
2329

2430
import com.alipay.oceanbase.rpc.ObTableClient;
31+
import com.alipay.oceanbase.rpc.exception.ObTableTryLockTimeoutException;
32+
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
2533
import com.alipay.oceanbase.rpc.location.LocationUtil;
2634
import org.slf4j.Logger;
35+
2736
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.getLogger;
2837

2938
public class RouteTableRefresher {
3039

31-
private static final Logger logger = getLogger(RouteTableRefresher.class);
40+
private static final Logger logger = getLogger(RouteTableRefresher.class);
41+
42+
private static final String sql = "select 'detect server alive' from dual";
43+
44+
private final ObTableClient tableClient;
45+
46+
private final ObUserAuth sysUA;
47+
48+
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
3249

33-
private final ObTableClient tableClient;
50+
private final ConcurrentHashMap<ObServerAddr, Lock> suspectLocks = new ConcurrentHashMap<>(); // ObServer -> access lock
3451

35-
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
52+
private final ConcurrentHashMap<ObServerAddr, SuspectObServer> suspectServers = new ConcurrentHashMap<>(); // ObServer -> information structure
3653

37-
public RouteTableRefresher(ObTableClient tableClient) {
54+
private final HashMap<ObServerAddr, Long> serverLastAccessTimestamps = new HashMap<>(); // ObServer -> last access timestamp
55+
56+
public RouteTableRefresher(ObTableClient tableClient, ObUserAuth sysUA) {
3857
this.tableClient = tableClient;
58+
this.sysUA = sysUA;
59+
}
60+
61+
/**
62+
* check whether observers have changed every 30 seconds
63+
* if changed, refresh in the background
64+
* */
65+
public void start() {
66+
scheduler.scheduleAtFixedRate(this::doRsListCheck, 30, 30, TimeUnit.SECONDS);
67+
scheduler.scheduleWithFixedDelay(this::doCheckAliveTask, 1, 1, TimeUnit.SECONDS);
68+
}
69+
70+
public void close() {
71+
try {
72+
scheduler.shutdown();
73+
// wait at most 1 seconds to close the scheduler
74+
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
75+
scheduler.shutdownNow();
76+
}
77+
} catch (InterruptedException e) {
78+
logger.warn("scheduler await for terminate interrupted: {}.", e.getMessage());
79+
scheduler.shutdownNow();
80+
}
3981
}
4082

4183
/**
@@ -86,25 +128,164 @@ private void doRsListCheck() {
86128
}
87129
}
88130

89-
/**
90-
* check whether observers have changed every 30 seconds
91-
* if changed, refresh in the background
92-
* */
93-
public void start() {
94-
scheduler.scheduleAtFixedRate(this::doRsListCheck, 30, 30, TimeUnit.SECONDS);
131+
private void doCheckAliveTask() {
132+
for (Map.Entry<ObServerAddr, SuspectObServer> entry : suspectServers.entrySet()) {
133+
try {
134+
checkAlive(entry.getKey());
135+
} catch (Exception e) {
136+
// silence resolving
137+
logger.warn("RouteTableRefresher::doCheckAliveTask fail, failed server: {}", entry.getKey().toString());
138+
}
139+
}
95140
}
96141

97-
public void close() {
142+
private void checkAlive(ObServerAddr addr) {
143+
long connectTimeout = 1000L; // 1s
144+
long socketTimeout = 5000L; // 5s
145+
int failureLimit = 3;
146+
String url = LocationUtil.formatObServerUrl(addr, connectTimeout, socketTimeout);
147+
TableRoute tableRoute = tableClient.getTableRoute();
148+
Connection connection = null;
149+
Statement statement = null;
98150
try {
99-
scheduler.shutdown();
100-
// wait at most 1 seconds to close the scheduler
101-
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
102-
scheduler.shutdownNow();
151+
connection = LocationUtil.getMetaRefreshConnection(url, sysUA);
152+
statement = connection.createStatement();
153+
statement.execute(sql);
154+
removeFromSuspectIPs(addr);
155+
} catch (Exception e) {
156+
if (e instanceof SQLException) {
157+
SuspectObServer server = suspectServers.get(addr);
158+
server.incrementFailure();
159+
int failure = server.getFailure();
160+
if (failure >= failureLimit) {
161+
tableRoute.removeObServer(addr);
162+
removeFromSuspectIPs(addr);
163+
}
164+
logger.debug("background keep-alive mechanic failed to receive response, server: {}, failure: {}",
165+
addr, failure, e);
166+
} else {
167+
// silence resolving
168+
logger.warn("background check-alive mechanic meet exception, server: {}", addr.toString(), e);
169+
}
170+
} finally {
171+
try {
172+
if (statement != null) {
173+
statement.close();
174+
}
175+
if (connection != null) {
176+
connection.close();
177+
}
178+
} catch (SQLException e) {
179+
// ignore
180+
}
181+
}
182+
}
183+
184+
public void addIntoSuspectIPs(SuspectObServer server) throws Exception {
185+
ObServerAddr addr = server.getAddr();
186+
if (suspectServers.get(addr) != null) {
187+
// already in the list, directly return
188+
return;
189+
}
190+
long addInterval = 20000L; // 20s
191+
Lock tempLock = new ReentrantLock();
192+
Lock lock = suspectLocks.putIfAbsent(addr, tempLock);
193+
lock = (lock == null) ? tempLock : lock;
194+
boolean acquired = false;
195+
try {
196+
int retryTimes = 0;
197+
while (true) {
198+
try {
199+
acquired = lock.tryLock(1, TimeUnit.SECONDS);
200+
if (!acquired) {
201+
throw new ObTableTryLockTimeoutException("try to get suspect server lock timeout, timeout: 1s");
202+
}
203+
if (suspectServers.get(addr) != null) {
204+
// already in the list, directly break
205+
break;
206+
}
207+
Long lastServerAccessTs = serverLastAccessTimestamps.get(addr);
208+
if (lastServerAccessTs != null) {
209+
long interval = System.currentTimeMillis() - lastServerAccessTs;
210+
if (interval < addInterval) {
211+
// do not repeatedly add within 20 seconds since last adding
212+
break;
213+
}
214+
}
215+
suspectServers.put(addr, server);
216+
serverLastAccessTimestamps.put(addr, server.getAccessTimestamp());
217+
break;
218+
} catch (ObTableTryLockTimeoutException e) {
219+
// if try lock timeout, need to retry
220+
++retryTimes;
221+
logger.warn("wait to try lock to timeout 1s when add observer into suspect ips, server: {}, tryTimes: {}",
222+
addr.toString(), retryTimes, e);
223+
}
224+
} // end while
225+
} finally {
226+
if (acquired) {
227+
lock.unlock();
103228
}
104-
} catch (InterruptedException e) {
105-
logger.warn("scheduler await for terminate interrupted: {}.", e.getMessage());
106-
scheduler.shutdownNow();
107229
}
108230
}
109231

232+
private void removeFromSuspectIPs(ObServerAddr addr) {
233+
Lock lock = suspectLocks.get(addr);
234+
if (lock == null) {
235+
// lock must have been added before remove
236+
throw new ObTableUnexpectedException(String.format("ObServer [%s:%d] need to be add into suspect ips before remove",
237+
addr.getIp(), addr.getSvrPort()));
238+
}
239+
boolean acquired = false;
240+
try {
241+
int retryTimes = 0;
242+
while (true) {
243+
try {
244+
acquired = lock.tryLock(1, TimeUnit.SECONDS);
245+
if (!acquired) {
246+
throw new ObTableTryLockTimeoutException("try to get suspect server lock timeout, timeout: 1s");
247+
}
248+
// no need to remove lock
249+
suspectServers.remove(addr);
250+
break;
251+
} catch (ObTableTryLockTimeoutException e) {
252+
// if try lock timeout, need to retry
253+
++retryTimes;
254+
logger.warn("wait to try lock to timeout when add observer into suspect ips, server: {}, tryTimes: {}",
255+
addr.toString(), retryTimes, e);
256+
} catch (InterruptedException e) {
257+
// do not throw exception to user layer
258+
// next background task will continue to remove it
259+
logger.warn("waiting to get lock while interrupted by other threads", e);
260+
}
261+
}
262+
} finally {
263+
if (acquired) {
264+
lock.unlock();
265+
}
266+
}
267+
}
268+
269+
public static class SuspectObServer {
270+
private final ObServerAddr addr;
271+
private final long accessTimestamp;
272+
private int failure;
273+
public SuspectObServer(ObServerAddr addr) {
274+
this.addr = addr;
275+
accessTimestamp = System.currentTimeMillis();
276+
failure = 0;
277+
}
278+
public ObServerAddr getAddr() {
279+
return this.addr;
280+
}
281+
public long getAccessTimestamp() {
282+
return this.accessTimestamp;
283+
}
284+
public int getFailure() {
285+
return this.failure;
286+
}
287+
public void incrementFailure() {
288+
++failure;
289+
}
290+
}
110291
}

src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public List<ObServerAddr> refreshTablesAndGetNewServers(List<ReplicaLocation> ne
9595

9696
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
9797
.setLoginInfo(tenantName, userName, password, database, clientType) //
98-
.setProperties(properties).setConfigs(tableConfigs).build();
98+
.setProperties(properties).setConfigs(tableConfigs).setObServerAddr(addr).build();
9999
ObTable oldObTable = tables.putIfAbsent(addr, obTable);
100100
logger.warn("add new table addr, {}", addr.toString());
101101
if (oldObTable != null) { // maybe create two ob table concurrently, close current ob table

src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public void initRoster(TableEntryKey rootServerKey, boolean initialized,
259259
tableClient.getClientType(runningMode))
260260
//
261261
.setProperties(tableClient.getProperties())
262-
.setConfigs(tableClient.getTableConfigs()).build();
262+
.setConfigs(tableClient.getTableConfigs()).setObServerAddr(addr).build();
263263
addr2Table.put(addr, obTable);
264264
servers.add(addr);
265265
} catch (Exception e) {
@@ -353,10 +353,25 @@ public void initRoster(TableEntryKey rootServerKey, boolean initialized,
353353
}
354354

355355
public void launchRouteRefresher() {
356-
routeRefresher = new RouteTableRefresher(tableClient);
356+
routeRefresher = new RouteTableRefresher(tableClient, sysUA);
357357
routeRefresher.start();
358358
}
359359

360+
public void removeObServer(ObServerAddr addr) {
361+
logger.debug("remove useless table addr, {}", addr.toString());
362+
ConcurrentHashMap<ObServerAddr, ObTable> tables = this.tableRoster.getTables();
363+
ObTable table = tables.remove(addr);
364+
if (table != null) {
365+
table.close();
366+
}
367+
List<ObServerAddr> servers = this.serverRoster.getMembers();
368+
servers.remove(addr);
369+
}
370+
371+
public void addIntoSuspectIPs(RouteTableRefresher.SuspectObServer addr) throws Exception {
372+
routeRefresher.addIntoSuspectIPs(addr);
373+
}
374+
360375
/**
361376
* refresh all ob server synchronized, it will not refresh if last refresh time is 1 min ago
362377
* @param newRsList new root servers

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,9 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
290290
&& ((ObTableTransportException) e).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
291291
logger.debug("query meet transport timeout, obTable ip:port is {}:{}",
292292
subObTable.getIp(), subObTable.getPort());
293-
client.syncRefreshMetadata(true);
294293
long tabletId = partIdWithIndex.getRight().getTabletId();
295-
client.refreshTableLocationByTabletId(indexTableName, tabletId);
296294
subObTable.setDirty();
295+
client.dealWithRpcTimeoutForSingleTablet(subObTable.getObServerAddr(), indexTableName, tabletId);
297296
}
298297
client.calculateContinuousFailure(indexTableName, e.getMessage());
299298
throw e;

0 commit comments

Comments
 (0)