Skip to content

Commit fdc7b52

Browse files
committed
inner-2413: recreate heartbeat connection when timeout
1 parent d3b3432 commit fdc7b52

File tree

5 files changed

+35
-0
lines changed

5 files changed

+35
-0
lines changed

src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,9 @@ public String toString() {
182182
public boolean isQuit() {
183183
return connectionRef.getStamp() == 2;
184184
}
185+
186+
187+
public MySQLHeartbeat getHeartbeat() {
188+
return heartbeat;
189+
}
185190
}

src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class MySQLHeartbeat {
4545
private final DbInstanceSyncRecorder asyncRecorder = new DbInstanceSyncRecorder();
4646
private final PhysicalDbInstance source;
4747
protected volatile MySQLHeartbeatStatus status;
48+
private volatile long beginTimeoutTime = 0;
4849
private String heartbeatSQL;
4950
private long heartbeatTimeout; // during the time, heart failed will ignore
5051
private final AtomicInteger errorCount = new AtomicInteger(0);
@@ -251,10 +252,16 @@ private void setTimeout() {
251252
}
252253
if (status != MySQLHeartbeatStatus.TIMEOUT) {
253254
LOGGER.warn("heartbeat to [{}] setTimeout, previous status is {}", source.getConfig().getUrl(), status);
255+
beginTimeoutTime = System.currentTimeMillis();
254256
status = MySQLHeartbeatStatus.TIMEOUT;
255257
}
256258
}
257259

260+
261+
public long getBeginTimeoutTime() {
262+
return beginTimeoutTime;
263+
}
264+
258265
public boolean isHeartBeatOK() {
259266
if (status == MySQLHeartbeatStatus.OK || status == MySQLHeartbeatStatus.INIT) {
260267
return true;

src/main/java/com/actiontech/dble/config/model/SystemConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ private SystemConfig() {
8484
private long idleTimeout = DEFAULT_IDLE_TIMEOUT;
8585
// sql execute timeout (second)
8686
private long sqlExecuteTimeout = 300;
87+
private long heartbeatSqlExecuteTimeout = 10;
8788
// connection will force close if received close packet but haven't been closed after closeTimeout milliseconds.
8889
// set the value too big is not a good idea.
8990
private long closeTimeout = 100;
@@ -782,6 +783,14 @@ public void setSqlExecuteTimeout(long sqlExecuteTimeout) {
782783
}
783784

784785

786+
public long getHeartbeatSqlExecuteTimeout() {
787+
return heartbeatSqlExecuteTimeout;
788+
}
789+
790+
public void setHeartbeatSqlExecuteTimeout(long heartbeatSqlExecuteTimeout) {
791+
this.heartbeatSqlExecuteTimeout = heartbeatSqlExecuteTimeout;
792+
}
793+
785794
public int getTxIsolation() {
786795
return txIsolation;
787796
}
@@ -1698,6 +1707,7 @@ public String toString() {
16981707
", groupConcatMaxLen='" + groupConcatMaxLen +
16991708
", releaseTimeout=" + releaseTimeout +
17001709
", enableAsyncRelease=" + enableAsyncRelease +
1710+
", heartbeatSqlExecuteTimeout=" + heartbeatSqlExecuteTimeout +
17011711
"]";
17021712
}
17031713

src/main/java/com/actiontech/dble/net/IOProcessor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88

99
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
10+
import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob;
11+
import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat;
12+
import com.actiontech.dble.backend.heartbeat.MySQLHeartbeatStatus;
1013
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage;
1114
import com.actiontech.dble.backend.mysql.xa.TxState;
1215
import com.actiontech.dble.buffer.BufferPool;
@@ -187,6 +190,7 @@ private void checkConSendQueue(AbstractConnection c) {
187190

188191
private void backendCheck() {
189192
long sqlTimeout = SystemConfig.getInstance().getSqlExecuteTimeout() * 1000L;
193+
final long heartbeatSqlExecuteTimeout = SystemConfig.getInstance().getHeartbeatSqlExecuteTimeout() * 1000L;
190194
Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator();
191195
while (it.hasNext()) {
192196
BackendConnection c = it.next().getValue();
@@ -226,6 +230,14 @@ private void backendCheck() {
226230
if (!c.getBackendService().isDDL() && c.getState() == PooledConnection.STATE_IN_USE && c.getBackendService().isExecuting() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
227231
LOGGER.info("found backend connection SQL timeout ,close it " + c);
228232
c.close("sql timeout");
233+
} else if ((c.getBackendService().getResponseHandler() instanceof HeartbeatSQLJob)) {
234+
if (heartbeatSqlExecuteTimeout > 0) {
235+
final MySQLHeartbeat heartbeat = ((HeartbeatSQLJob) c.getBackendService().getResponseHandler()).getHeartbeat();
236+
if (c.getBackendService().isExecuting() && heartbeat.getStatus() == MySQLHeartbeatStatus.TIMEOUT && heartbeat.getBeginTimeoutTime() < System.currentTimeMillis() - heartbeatSqlExecuteTimeout) {
237+
LOGGER.info("found backend heartbeat connection SQL timeout ,close it " + c);
238+
c.close("heart sql timeout");
239+
}
240+
}
229241
}
230242

231243
// clean closed conn or check time out

src/main/java/com/actiontech/dble/singleton/SystemParams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ private SystemParams() {
9191
readOnlyParams.add(new ParamInfo("checkTableConsistencyPeriod", sysConfig.getCheckTableConsistencyPeriod() + "ms", "The period of consistency tableStructure check. The default value is 1800000ms(means 30minutes=30*60*1000)"));
9292
readOnlyParams.add(new ParamInfo("processorCheckPeriod", sysConfig.getProcessorCheckPeriod() + "ms", "The period between the jobs for cleaning the closed or overtime connections. The default is 1000ms"));
9393
readOnlyParams.add(new ParamInfo("sqlExecuteTimeout", sysConfig.getSqlExecuteTimeout() + "s", "The max query executing time.If time out,the connection will be closed. The default is 300 seconds"));
94+
readOnlyParams.add(new ParamInfo("heartbeatSqlExecuteTimeout", sysConfig.getHeartbeatSqlExecuteTimeout() + "s", "The max heartbeat query executing time.If time out,the connection will be closed. The default is 10 seconds.set 0 to disable it."));
9495
readOnlyParams.add(new ParamInfo("recordTxn", sysConfig.getRecordTxn() + "", "Whether the transaction be recorded as a file, the default value is 0"));
9596
readOnlyParams.add(new ParamInfo("transactionLogBaseDir", sysConfig.getTransactionLogBaseDir(), "The directory of the transaction record file, the default value is ./txlogs/"));
9697
readOnlyParams.add(new ParamInfo("transactionLogBaseName", sysConfig.getTransactionLogBaseName(), "The name of the transaction record file. The default value is server-tx"));

0 commit comments

Comments
 (0)