Skip to content

Commit 3ff855e

Browse files
authored
Fix stuck when stopping a DataNode with large unremovable WAL files (apache#15727)
* Fix stuck when stopping a DataNode with large unremovable WAL files * spotless * add shutdown hook watcher * Fix logDispatcher stuck * add re-interrupt
1 parent f55dfe0 commit 3ff855e

File tree

9 files changed

+221
-6
lines changed

9 files changed

+221
-6
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,16 @@ public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad)
119119
setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad));
120120
return this;
121121
}
122+
123+
@Override
124+
public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
125+
setProperty("wal_throttle_threshold_in_byte", String.valueOf(walThrottleSize));
126+
return this;
127+
}
128+
129+
@Override
130+
public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) {
131+
setProperty("delete_wal_files_period_in_ms", String.valueOf(deleteWalFilesPeriodInMs));
132+
return this;
133+
}
122134
}

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,4 +818,8 @@ public long getPid() {
818818
return -1;
819819
}
820820
}
821+
822+
public Process getInstance() {
823+
return instance;
824+
}
821825
}

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,14 @@ public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
7878
public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
7979
return this;
8080
}
81+
82+
@Override
83+
public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
84+
return this;
85+
}
86+
87+
@Override
88+
public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) {
89+
return this;
90+
}
8191
}

integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,8 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
4545
DataNodeConfig setLoadLastCacheStrategy(String strategyName);
4646

4747
DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
48+
49+
DataNodeConfig setWalThrottleSize(long walThrottleSize);
50+
51+
DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
4852
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.it;
21+
22+
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
23+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
24+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
25+
import org.apache.iotdb.itbase.category.ClusterIT;
26+
import org.apache.iotdb.rpc.IoTDBConnectionException;
27+
import org.apache.iotdb.rpc.StatementExecutionException;
28+
import org.apache.iotdb.session.Session;
29+
30+
import org.junit.Test;
31+
import org.junit.experimental.categories.Category;
32+
import org.junit.runner.RunWith;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.sql.Connection;
37+
import java.sql.ResultSet;
38+
import java.sql.SQLException;
39+
import java.sql.Statement;
40+
import java.util.Date;
41+
42+
import static org.junit.Assert.fail;
43+
44+
/** Tests that may not be satisfied with the default cluster settings. */
45+
@RunWith(IoTDBTestRunner.class)
46+
@Category({ClusterIT.class})
47+
public class IoTDBCustomizedClusterIT {
48+
49+
private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
50+
51+
/**
52+
* When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try
53+
* deleting wal forever, which will block the DataNode from exiting, because task of deleting wal
54+
* submitted by the ShutdownHook cannot be executed. This test ensures that this blocking is
55+
* fixed.
56+
*/
57+
@Test
58+
public void testWalThrottleStuck()
59+
throws SQLException,
60+
IoTDBConnectionException,
61+
StatementExecutionException,
62+
InterruptedException {
63+
SimpleEnv simpleEnv = new SimpleEnv();
64+
simpleEnv
65+
.getConfig()
66+
.getDataNodeConfig()
67+
.setWalThrottleSize(1)
68+
.setDeleteWalFilesPeriodInMs(100);
69+
simpleEnv
70+
.getConfig()
71+
.getCommonConfig()
72+
.setDataReplicationFactor(3)
73+
.setSchemaReplicationFactor(3)
74+
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
75+
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
76+
try {
77+
simpleEnv.initClusterEnvironment(1, 3);
78+
79+
int leaderIndex = -1;
80+
try (Connection connection = simpleEnv.getConnection();
81+
Statement statement = connection.createStatement()) {
82+
// write the first data
83+
statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
84+
// find the leader of the data region
85+
int port = -1;
86+
87+
ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
88+
while (resultSet.next()) {
89+
String regionType = resultSet.getString("Type");
90+
if (regionType.equals("DataRegion")) {
91+
String role = resultSet.getString("Role");
92+
if (role.equals("Leader")) {
93+
port = resultSet.getInt("RpcPort");
94+
break;
95+
}
96+
}
97+
}
98+
99+
if (port == -1) {
100+
fail("Leader not found");
101+
}
102+
103+
for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
104+
if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
105+
leaderIndex = i;
106+
break;
107+
}
108+
}
109+
}
110+
111+
// stop a follower
112+
int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size();
113+
simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
114+
System.out.println(
115+
new Date()
116+
+ ":Stopping data node "
117+
+ simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
118+
119+
DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex);
120+
// write to the leader to generate wal that cannot be synced
121+
try (Session session = new Session(leader.getIp(), leader.getPort())) {
122+
session.open();
123+
124+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
125+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
126+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
127+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
128+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
129+
}
130+
131+
// wait for wal-delete thread to be scheduled
132+
Thread.sleep(1000);
133+
134+
// stop the leader
135+
leader.getInstance().destroy();
136+
System.out.println(new Date() + ":Stopping data node " + leader.getIpAndPortString());
137+
// confirm the death of the leader
138+
long startTime = System.currentTimeMillis();
139+
while (leader.isAlive()) {
140+
if (System.currentTimeMillis() - startTime > 30000) {
141+
fail("Leader does not exit after 30s");
142+
}
143+
}
144+
} finally {
145+
simpleEnv.cleanClusterEnvironment();
146+
}
147+
}
148+
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public synchronized void start() {
105105
public synchronized void stop() {
106106
if (!threads.isEmpty()) {
107107
threads.forEach(LogDispatcherThread::setStopped);
108-
threads.forEach(LogDispatcherThread::processStopped);
109108
executorService.shutdownNow();
109+
threads.forEach(LogDispatcherThread::processStopped);
110110
int timeout = 10;
111111
try {
112112
if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ public SyncStatus(IndexController controller, IoTConsensusConfig config) {
4444
* @throws InterruptedException
4545
*/
4646
public synchronized void addNextBatch(Batch batch) throws InterruptedException {
47-
while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
48-
|| !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) {
47+
while ((pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
48+
|| !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false))
49+
&& !Thread.interrupted()) {
4950
wait();
5051
}
5152
pendingBatches.add(batch);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,43 @@ public class DataNodeShutdownHook extends Thread {
4949
private static final Logger logger = LoggerFactory.getLogger(DataNodeShutdownHook.class);
5050

5151
private final TDataNodeLocation nodeLocation;
52+
private Thread watcherThread;
5253

5354
public DataNodeShutdownHook(TDataNodeLocation nodeLocation) {
5455
super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName());
5556
this.nodeLocation = nodeLocation;
5657
}
5758

59+
private void startWatcher() {
60+
Thread hookThread = Thread.currentThread();
61+
watcherThread =
62+
new Thread(
63+
() -> {
64+
while (!Thread.interrupted()) {
65+
try {
66+
Thread.sleep(10000);
67+
StackTraceElement[] stackTrace = hookThread.getStackTrace();
68+
StringBuilder stackTraceBuilder =
69+
new StringBuilder("Stack trace of shutdown hook:\n");
70+
for (StackTraceElement traceElement : stackTrace) {
71+
stackTraceBuilder.append(traceElement.toString()).append("\n");
72+
}
73+
logger.info(stackTraceBuilder.toString());
74+
} catch (InterruptedException e) {
75+
Thread.currentThread().interrupt();
76+
return;
77+
}
78+
}
79+
},
80+
"ShutdownHookWatcher");
81+
watcherThread.setDaemon(true);
82+
watcherThread.start();
83+
}
84+
5885
@Override
5986
public void run() {
6087
logger.info("DataNode exiting...");
88+
startWatcher();
6189
// Stop external rpc service firstly.
6290
ExternalRPCService.getInstance().stop();
6391

@@ -77,7 +105,6 @@ public void run() {
77105
.equals(ConsensusFactory.RATIS_CONSENSUS)) {
78106
StorageEngine.getInstance().syncCloseAllProcessor();
79107
}
80-
WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
81108

82109
// We did this work because the RatisConsensus recovery mechanism is different from other
83110
// consensus algorithms, which will replace the underlying storage engine based on its
@@ -114,6 +141,8 @@ public void run() {
114141
"DataNode exits. Jvm memory usage: {}",
115142
MemUtils.bytesCntToStr(
116143
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()));
144+
145+
watcherThread.interrupt();
117146
}
118147

119148
private void triggerSnapshotForAllDataRegion() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ private void deleteOutdatedFiles() {
180180
// threshold, the system continues to delete expired files until the disk size is smaller than
181181
// the threshold.
182182
boolean firstLoop = true;
183-
while ((firstLoop || shouldThrottle()) && !Thread.interrupted()) {
183+
while ((firstLoop || shouldThrottle())) {
184184
deleteOutdatedFilesInWALNodes();
185185
if (firstLoop && shouldThrottle()) {
186186
logger.warn(
@@ -189,6 +189,10 @@ private void deleteOutdatedFiles() {
189189
getThrottleThreshold());
190190
}
191191
firstLoop = false;
192+
if (Thread.interrupted()) {
193+
logger.info("Timed wal delete thread is interrupted.");
194+
return;
195+
}
192196
}
193197
}
194198

@@ -267,12 +271,15 @@ public void stop() {
267271
if (config.getWalMode() == WALMode.DISABLE) {
268272
return;
269273
}
270-
274+
logger.info("Stopping WALManager");
271275
if (walDeleteThread != null) {
272276
shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
273277
walDeleteThread = null;
274278
}
279+
logger.info("Deleting outdated files before exiting");
280+
deleteOutdatedFilesInWALNodes();
275281
clear();
282+
logger.info("WALManager stopped");
276283
}
277284

278285
private void shutdownThread(ExecutorService thread, ThreadName threadName) {

0 commit comments

Comments
 (0)