Skip to content

Commit 84a8250

Browse files
committed
fix
1 parent c58f327 commit 84a8250

File tree

6 files changed

+76
-24
lines changed

6 files changed

+76
-24
lines changed

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3675,5 +3675,6 @@ public static int metaServiceRpcRetryTimes() {
36753675
@ConfField(mutable = true)
36763676
public static String aws_credentials_provider_version = "v2";
36773677

3678+
@ConfField(mutable = true)
36783679
public static long agent_task_health_check_intervals_ms = 5 * 60 * 1000L; // 5 min
36793680
}

fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@
264264
import org.apache.doris.system.SystemInfoService;
265265
import org.apache.doris.system.SystemInfoService.HostInfo;
266266
import org.apache.doris.task.AgentBatchTask;
267+
import org.apache.doris.task.AgentTaskCleanupDaemon;
267268
import org.apache.doris.task.AgentTaskExecutor;
268269
import org.apache.doris.task.CompactionTask;
269270
import org.apache.doris.task.MasterTaskExecutor;
@@ -580,6 +581,8 @@ public class Env {
580581

581582
private StatisticsMetricCollector statisticsMetricCollector;
582583

584+
private AgentTaskCleanupDaemon agentTaskCleanupDaemon;
585+
583586
// if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it.
584587
private final Map<String, Supplier<MasterDaemon>> configtoThreads = ImmutableMap
585588
.of("dynamic_partition_check_interval_seconds", this::getDynamicPartitionScheduler);
@@ -840,6 +843,7 @@ public Env(boolean isCheckpointCatalog) {
840843
this.dictionaryManager = new DictionaryManager();
841844
this.keyManagerStore = new KeyManagerStore();
842845
this.keyManager = KeyManagerFactory.getKeyManager();
846+
this.agentTaskCleanupDaemon = new AgentTaskCleanupDaemon();
843847
}
844848

845849
public static Map<String, Long> getSessionReportTimeMap() {
@@ -1962,6 +1966,7 @@ protected void startMasterOnlyDaemonThreads() {
19621966
if (keyManager != null) {
19631967
keyManager.init();
19641968
}
1969+
agentTaskCleanupDaemon.start();
19651970
}
19661971

19671972
// start threads that should run on all FE

fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,41 @@
2525
import org.apache.doris.system.SystemInfoService;
2626
import org.apache.doris.thrift.TStatusCode;
2727

28+
import org.apache.logging.log4j.LogManager;
29+
import org.apache.logging.log4j.Logger;
30+
2831
public class AgentTaskCleanupDaemon extends MasterDaemon {
32+
private static final Logger LOG = LogManager.getLogger(AgentTaskCleanupDaemon.class);
33+
2934
public AgentTaskCleanupDaemon() {
3035
super("agent-task-cleanup", Config.agent_task_health_check_intervals_ms);
3136
}
3237

3338
@Override
3439
protected void runAfterCatalogReady() {
40+
LOG.info("Begin to clean up inactive agent tasks");
3541
SystemInfoService infoService = Env.getCurrentSystemInfo();
3642
infoService.getAllClusterBackends(false)
3743
.stream()
3844
.filter(backend -> !backend.isAlive())
3945
.map(Backend::getId)
4046
.forEach(this::removeInactiveBeAgentTasks);
47+
LOG.info("Finish to clean up inactive agent tasks");
4148
}
4249

4350
private void removeInactiveBeAgentTasks(Long beId) {
4451
AgentTaskQueue.removeTask(beId, (agentTask -> {
45-
long tabletId = agentTask.getTabletId();
46-
String errMsg = "BE down, this agent task is aborted. BE=" + beId + ", tablet=" + tabletId;
52+
String errMsg = "BE down, this agent task is aborted";
4753
if (agentTask instanceof PushTask) {
4854
PushTask task = ((PushTask) agentTask);
49-
task.countDownLatchWithStatus(beId, tabletId, new Status(TStatusCode.ABORTED, errMsg));
55+
task.countDownLatchWithStatus(beId, agentTask.getTabletId(), new Status(TStatusCode.ABORTED, errMsg));
5056
}
5157
agentTask.setFinished(true);
5258
agentTask.setErrorCode(TStatusCode.ABORTED);
5359
agentTask.setErrorMsg(errMsg);
60+
if (LOG.isDebugEnabled()) {
61+
LOG.debug("BE down, remove agent task: {}", agentTask);
62+
}
5463
}));
5564
}
5665
}

fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,12 @@ public static synchronized void removeTask(long backendId, TTaskType type, long
100100
public static synchronized void removeTask(long backendId, Consumer<AgentTask> onTaskRemoved) {
101101
Map<TTaskType, Map<Long, AgentTask>> tasks = AgentTaskQueue.tasks.row(backendId);
102102
tasks.forEach((type, taskSet) -> {
103-
taskSet.forEach((signature, task) -> {
104-
removeTask(backendId, type, signature);
105-
onTaskRemoved.accept(task);
106-
});
103+
Iterator<Map.Entry<Long, AgentTask>> it = taskSet.entrySet().iterator();
104+
while (it.hasNext()) {
105+
Map.Entry<Long, AgentTask> entry = it.next();
106+
it.remove();
107+
onTaskRemoved.accept(entry.getValue());
108+
}
107109
});
108110
}
109111

regression-test/suites/fault_injection_p0/test_sc_stuck_when_be_down.groovy renamed to regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import org.apache.doris.regression.suite.ClusterOptions
22

3-
suite("test_sc_stuck_when_be_down", "docker") {
3+
suite("test_sc_fail_when_be_down", "docker") {
44
def options = new ClusterOptions()
55
options.cloudMode = false
66
options.beNum = 3
@@ -11,7 +11,7 @@ suite("test_sc_stuck_when_be_down", "docker") {
1111
docker(options) {
1212
GetDebugPoint().clearDebugPointsForAllBEs()
1313

14-
def tblName = "test_sc_stuck_when_be_down"
14+
def tblName = "test_sc_fail_when_be_down"
1515
sql """ DROP TABLE IF EXISTS ${tblName} """
1616
sql """
1717
CREATE TABLE IF NOT EXISTS ${tblName} (
@@ -28,30 +28,22 @@ suite("test_sc_stuck_when_be_down", "docker") {
2828
sql """ INSERT INTO ${tblName} SELECT number, number, number from numbers("number" = "1024") """
2929

3030
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
31-
try {
32-
sql """ ALTER TABLE ${tblName} MODIFY COLUMN v0 VARCHAR(100) """
33-
sleep(3000)
34-
cluster.stopBackends(1)
35-
waitForSchemaChangeDone {
36-
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY createtime DESC LIMIT 1 """
37-
time 600
38-
}
39-
} finally {
40-
}
41-
42-
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.sleep")
4331
try {
4432
sql """ ALTER TABLE ${tblName} MODIFY COLUMN v1 VARCHAR(100) """
45-
sleep(3000)
33+
sleep(1000)
4634
cluster.stopBackends(1, 2)
35+
sleep(10000)
36+
def ret = sql """ SHOW ALTER TABLE COLUMN WHERE TableName='test_sc_stuck_when_be_down' ORDER BY createtime DESC LIMIT 1 """
37+
println(ret)
4738
waitForSchemaChangeDone {
4839
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY createtime DESC LIMIT 1 """
4940
time 600
5041
}
5142
assertTrue(false)
52-
} catch (Exception ignore) {
43+
} catch (Throwable ignore) {
5344
// do nothing
5445
} finally {
46+
GetDebugPoint().clearDebugPointsForAllBEs()
5547
}
5648
}
57-
}
49+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import org.apache.doris.regression.suite.ClusterOptions
2+
3+
suite("test_sc_success_when_be_down", "docker") {
4+
def options = new ClusterOptions()
5+
options.cloudMode = false
6+
options.beNum = 3
7+
options.feNum = 2
8+
options.enableDebugPoints()
9+
options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
10+
11+
docker(options) {
12+
GetDebugPoint().clearDebugPointsForAllBEs()
13+
14+
def tblName = "test_sc_success_when_be_down"
15+
sql """ DROP TABLE IF EXISTS ${tblName} """
16+
sql """
17+
CREATE TABLE IF NOT EXISTS ${tblName} (
18+
`k` int NOT NULL,
19+
`v0` int NOT NULL,
20+
`v1` int NOT NULL
21+
)
22+
DUPLICATE KEY(`k`)
23+
DISTRIBUTED BY HASH(`k`) BUCKETS 24
24+
PROPERTIES (
25+
"replication_allocation" = "tag.location.default: 3"
26+
)
27+
"""
28+
sql """ INSERT INTO ${tblName} SELECT number, number, number from numbers("number" = "1024") """
29+
30+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
31+
try {
32+
sql """ ALTER TABLE ${tblName} MODIFY COLUMN v0 VARCHAR(100) """
33+
sleep(3000)
34+
cluster.stopBackends(1)
35+
waitForSchemaChangeDone {
36+
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY createtime DESC LIMIT 1 """
37+
time 600
38+
}
39+
} finally {
40+
GetDebugPoint().clearDebugPointsForAllBEs()
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)