Skip to content

Commit c5127e7

Browse files
committed
Fix stuck when stopping a DataNode with large unremovable WAL files (#15727)
* Fix stuck when stopping a DataNode with large unremovable WAL files * spotless * add shutdown hook watcher * Fix logDispatcher stuck * add re-interrupt
1 parent 2a7a8e6 commit c5127e7

File tree

9 files changed

+220
-5
lines changed

9 files changed

+220
-5
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,16 @@ public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad)
100100
setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad));
101101
return this;
102102
}
103+
104+
@Override
105+
public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
106+
setProperty("wal_throttle_threshold_in_byte", String.valueOf(walThrottleSize));
107+
return this;
108+
}
109+
110+
@Override
111+
public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) {
112+
setProperty("delete_wal_files_period_in_ms", String.valueOf(deleteWalFilesPeriodInMs));
113+
return this;
114+
}
103115
}

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,4 +758,8 @@ public long getPid() {
758758
return -1;
759759
}
760760
}
761+
762+
public Process getInstance() {
763+
return instance;
764+
}
761765
}

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,14 @@ public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
6363
public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
6464
return this;
6565
}
66+
67+
@Override
68+
public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
69+
return this;
70+
}
71+
72+
@Override
73+
public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) {
74+
return this;
75+
}
6676
}

integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,8 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
3939
DataNodeConfig setLoadLastCacheStrategy(String strategyName);
4040

4141
DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
42+
43+
DataNodeConfig setWalThrottleSize(long walThrottleSize);
44+
45+
DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
4246
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.it;
21+
22+
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
23+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
24+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
25+
import org.apache.iotdb.itbase.category.ClusterIT;
26+
import org.apache.iotdb.rpc.IoTDBConnectionException;
27+
import org.apache.iotdb.rpc.StatementExecutionException;
28+
import org.apache.iotdb.session.Session;
29+
30+
import org.junit.Test;
31+
import org.junit.experimental.categories.Category;
32+
import org.junit.runner.RunWith;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.sql.Connection;
37+
import java.sql.ResultSet;
38+
import java.sql.SQLException;
39+
import java.sql.Statement;
40+
import java.util.Date;
41+
42+
import static org.junit.Assert.fail;
43+
44+
/** Tests that may not be satisfied with the default cluster settings. */
45+
@RunWith(IoTDBTestRunner.class)
46+
@Category({ClusterIT.class})
47+
public class IoTDBCustomizedClusterIT {
48+
49+
private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
50+
51+
/**
52+
* When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try
53+
* deleting wal forever, which will block the DataNode from exiting, because task of deleting wal
54+
* submitted by the ShutdownHook cannot be executed. This test ensures that this blocking is
55+
* fixed.
56+
*/
57+
@Test
58+
public void testWalThrottleStuck()
59+
throws SQLException,
60+
IoTDBConnectionException,
61+
StatementExecutionException,
62+
InterruptedException {
63+
SimpleEnv simpleEnv = new SimpleEnv();
64+
simpleEnv
65+
.getConfig()
66+
.getDataNodeConfig()
67+
.setWalThrottleSize(1)
68+
.setDeleteWalFilesPeriodInMs(100);
69+
simpleEnv
70+
.getConfig()
71+
.getCommonConfig()
72+
.setDataReplicationFactor(3)
73+
.setSchemaReplicationFactor(3)
74+
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
75+
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
76+
try {
77+
simpleEnv.initClusterEnvironment(1, 3);
78+
79+
int leaderIndex = -1;
80+
try (Connection connection = simpleEnv.getConnection();
81+
Statement statement = connection.createStatement()) {
82+
// write the first data
83+
statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
84+
// find the leader of the data region
85+
int port = -1;
86+
87+
ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
88+
while (resultSet.next()) {
89+
String regionType = resultSet.getString("Type");
90+
if (regionType.equals("DataRegion")) {
91+
String role = resultSet.getString("Role");
92+
if (role.equals("Leader")) {
93+
port = resultSet.getInt("RpcPort");
94+
break;
95+
}
96+
}
97+
}
98+
99+
if (port == -1) {
100+
fail("Leader not found");
101+
}
102+
103+
for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
104+
if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
105+
leaderIndex = i;
106+
break;
107+
}
108+
}
109+
}
110+
111+
// stop a follower
112+
int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size();
113+
simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
114+
System.out.println(
115+
new Date()
116+
+ ":Stopping data node "
117+
+ simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
118+
119+
DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex);
120+
// write to the leader to generate wal that cannot be synced
121+
try (Session session = new Session(leader.getIp(), leader.getPort())) {
122+
session.open();
123+
124+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
125+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
126+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
127+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
128+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
129+
}
130+
131+
// wait for wal-delete thread to be scheduled
132+
Thread.sleep(1000);
133+
134+
// stop the leader
135+
leader.getInstance().destroy();
136+
System.out.println(new Date() + ":Stopping data node " + leader.getIpAndPortString());
137+
// confirm the death of the leader
138+
long startTime = System.currentTimeMillis();
139+
while (leader.isAlive()) {
140+
if (System.currentTimeMillis() - startTime > 30000) {
141+
fail("Leader does not exit after 30s");
142+
}
143+
}
144+
} finally {
145+
simpleEnv.cleanClusterEnvironment();
146+
}
147+
}
148+
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public synchronized void start() {
105105
public synchronized void stop() {
106106
if (!threads.isEmpty()) {
107107
threads.forEach(LogDispatcherThread::setStopped);
108-
threads.forEach(LogDispatcherThread::processStopped);
109108
executorService.shutdownNow();
109+
threads.forEach(LogDispatcherThread::processStopped);
110110
int timeout = 10;
111111
try {
112112
if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ public SyncStatus(IndexController controller, IoTConsensusConfig config) {
4444
* @throws InterruptedException
4545
*/
4646
public synchronized void addNextBatch(Batch batch) throws InterruptedException {
47-
while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
48-
|| !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) {
47+
while ((pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
48+
|| !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false))
49+
&& !Thread.interrupted()) {
4950
wait();
5051
}
5152
pendingBatches.add(batch);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,43 @@ public class DataNodeShutdownHook extends Thread {
5252
private static final Logger logger = LoggerFactory.getLogger(DataNodeShutdownHook.class);
5353

5454
private final TDataNodeLocation nodeLocation;
55+
private Thread watcherThread;
5556

5657
public DataNodeShutdownHook(TDataNodeLocation nodeLocation) {
5758
super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName());
5859
this.nodeLocation = nodeLocation;
5960
}
6061

62+
private void startWatcher() {
63+
Thread hookThread = Thread.currentThread();
64+
watcherThread =
65+
new Thread(
66+
() -> {
67+
while (!Thread.interrupted()) {
68+
try {
69+
Thread.sleep(10000);
70+
StackTraceElement[] stackTrace = hookThread.getStackTrace();
71+
StringBuilder stackTraceBuilder =
72+
new StringBuilder("Stack trace of shutdown hook:\n");
73+
for (StackTraceElement traceElement : stackTrace) {
74+
stackTraceBuilder.append(traceElement.toString()).append("\n");
75+
}
76+
logger.info(stackTraceBuilder.toString());
77+
} catch (InterruptedException e) {
78+
Thread.currentThread().interrupt();
79+
return;
80+
}
81+
}
82+
},
83+
"ShutdownHookWatcher");
84+
watcherThread.setDaemon(true);
85+
watcherThread.start();
86+
}
87+
6188
@Override
6289
public void run() {
6390
logger.info("DataNode exiting...");
91+
startWatcher();
6492
// Stop external rpc service firstly.
6593
ExternalRPCService.getInstance().stop();
6694

@@ -77,7 +105,6 @@ public void run() {
77105
.equals(ConsensusFactory.RATIS_CONSENSUS)) {
78106
StorageEngine.getInstance().syncCloseAllProcessor();
79107
}
80-
WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
81108

82109
// We did this work because the RatisConsensus recovery mechanism is different from other
83110
// consensus algorithms, which will replace the underlying storage engine based on its
@@ -141,6 +168,8 @@ public void run() {
141168
"DataNode exits. Jvm memory usage: {}",
142169
MemUtils.bytesCntToStr(
143170
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()));
171+
172+
watcherThread.interrupt();
144173
}
145174

146175
private void triggerSnapshotForAllDataRegion() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ private void deleteOutdatedFiles() {
186186
getThrottleThreshold());
187187
}
188188
firstLoop = false;
189+
if (Thread.interrupted()) {
190+
logger.info("Timed wal delete thread is interrupted.");
191+
return;
192+
}
189193
}
190194
}
191195

@@ -264,12 +268,15 @@ public void stop() {
264268
if (config.getWalMode() == WALMode.DISABLE) {
265269
return;
266270
}
267-
271+
logger.info("Stopping WALManager");
268272
if (walDeleteThread != null) {
269273
shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
270274
walDeleteThread = null;
271275
}
276+
logger.info("Deleting outdated files before exiting");
277+
deleteOutdatedFilesInWALNodes();
272278
clear();
279+
logger.info("WALManager stopped");
273280
}
274281

275282
private void shutdownThread(ExecutorService thread, ThreadName threadName) {

0 commit comments

Comments
 (0)