Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.dolphinscheduler.server.master.integration;

import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -39,6 +42,9 @@ public class Repository {
@Autowired
private TaskInstanceDao taskInstanceDao;

@Autowired
private AlertDao alertDao;

/**
* Return the list of process instances for a given workflow definition in ascending order of their IDs.
*/
Expand Down Expand Up @@ -87,4 +93,25 @@ public List<TaskInstance> queryAllTaskInstance() {
return taskInstanceDao.queryAll();
}

public List<Alert> queryAlert(final Integer workflowInstanceId) {
return alertDao.listAlerts(workflowInstanceId);
}

/**
* Return the list of alerts for all workflow instances of the given workflow definition,
* sorted in ascending order by alert ID.
*/
public List<Alert> queryAlert(final WorkflowDefinition workflowDefinition) {
List<WorkflowInstance> workflowInstances = queryWorkflowInstance(workflowDefinition);

if (workflowInstances.isEmpty()) {
return Collections.emptyList();
}

return workflowInstances.stream()
.map(WorkflowInstance::getId)
.flatMap(id -> this.queryAlert(id).stream())
.sorted(Comparator.comparingInt(Alert::getId))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
Expand Down Expand Up @@ -1435,4 +1437,191 @@ void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runF
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow which contains a task with timeout warn strategy")
public void testStartWorkflow_withTimeoutWarnTask() {
final String yaml = "/it/start/workflow_with_timeout_warn_task.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
long expectedProjectCode = parentWorkflow.getProjectCode();
long expectedWorkflowCode = parentWorkflow.getCode();

// Wait for the timeout to occur and alert to be sent (timeout + some buffer time)
await()
.atMost(Duration.ofSeconds(90))
.untilAsserted(() -> {
// Check if the task instance has reached the expected state
Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(1)
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("warn_task_with_timeout_alert");
assertThat(taskInstance.getState()).isNotEqualTo(TaskExecutionStatus.KILL);
});

// Check that the timeout alert was sent
List<Alert> alerts = repository.queryAlert(workflowInstanceId);
System.out.println(
"🔍 Found " + alerts.size() + " alert(s) for workflowInstanceId=" + workflowInstanceId);
if (!alerts.isEmpty()) {
Alert alert = alerts.get(0);
System.out.println("📄 Alert details:");
System.out.println(" - id: " + alert.getId());
System.out.println(" - projectCode: " + alert.getProjectCode());
System.out.println(" - workflowDefinitionCode: " + alert.getWorkflowDefinitionCode());
System.out.println(" - alertType: " + alert.getAlertType());
System.out.println(" - content: " + alert.getContent());
System.out.println(" - createTime: " + alert.getCreateTime());
}

// Now do assertions
Assertions
.assertThat(alerts)
.hasSize(1)
.satisfiesExactly(alert -> {
assertThat(alert.getProjectCode()).isEqualTo(expectedProjectCode);
assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(expectedWorkflowCode);
assertThat(alert.getAlertType()).isEqualTo(AlertType.TASK_TIMEOUT);
});
});

masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test timeout WARN task does NOT send alert when workflow has no warningGroupId")
public void testStartWorkflow_withTimeoutWarnTaskButNoWarningGroupId() {
final String yaml = "/it/start/workflow_with_timeout_warn_no_warning_group.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

// Wait long enough for timeout to occur (60s + buffer)
await()
.atMost(Duration.ofSeconds(90))
.untilAsserted(() -> {
// Task should still be running (WARN strategy doesn't kill)
Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(1)
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName())
.isEqualTo("warn_task_no_alert_due_to_missing_warning_group");
assertThat(taskInstance.getState()).isNotEqualTo(TaskExecutionStatus.KILL);
});

// NO alert should be sent because warningGroupId is null
Assertions
.assertThat(repository.queryAlert(workflowInstanceId))
.isEmpty();
});

masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow which contains a task with timeout WARNFAILED strategy")
public void testStartWorkflow_withTimeoutWarnFailedTask() {
final String yaml = "/it/start/workflow_with_timeout_warnfailed_task.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
long expectedProjectCode = parentWorkflow.getProjectCode();
long expectedWorkflowCode = parentWorkflow.getCode();

// Wait for the timeout to occur, alert to be sent, and task to be killed (timeout + buffer)
await()
.atMost(Duration.ofSeconds(90))
.untilAsserted(() -> {
// Check that the task instance has been marked as FAILED due to timeout kill
Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(1)
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("warnfailed_task_with_timeout_alert_kill");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
});

// Check that the timeout alert was sent
List<Alert> alerts = repository.queryAlert(workflowInstanceId);
System.out.println(
"🔍 Found " + alerts.size() + " alert(s) for workflowInstanceId=" + workflowInstanceId);
if (!alerts.isEmpty()) {
Alert alert = alerts.get(0);
System.out.println("📄 Alert details:");
System.out.println(" - id: " + alert.getId());
System.out.println(" - projectCode: " + alert.getProjectCode());
System.out.println(" - workflowDefinitionCode: " + alert.getWorkflowDefinitionCode());
System.out.println(" - alertType: " + alert.getAlertType());
System.out.println(" - content: " + alert.getContent());
System.out.println(" - createTime: " + alert.getCreateTime());
}

// Now do assertions
Assertions
.assertThat(alerts)
.hasSize(1)
.satisfiesExactly(alert -> {
assertThat(alert.getProjectCode()).isEqualTo(expectedProjectCode);
assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(expectedWorkflowCode);
assertThat(alert.getAlertType()).isEqualTo(AlertType.TASK_TIMEOUT);
});
});

masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test WARNFAILED task kills the task but does NOT send alert when warningGroupId is null")
public void testStartWorkflow_withTimeoutWarnFailedTaskButNoWarningGroupId() {
final String yaml = "/it/start/workflow_with_timeout_warnfailed_no_warning_group.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

// Wait for timeout to trigger kill (60s timeout + buffer)
await()
.atMost(Duration.ofSeconds(90))
.untilAsserted(() -> {
// Task should be KILLED → state = KILL
Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(1)
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName())
.isEqualTo("warnfailed_task_no_alert_due_to_missing_warning_group");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
});

// NO alert should be sent because warningGroupId is null
Assertions
.assertThat(repository.queryAlert(workflowInstanceId))
.isEmpty();
});

masterContainer.assertAllResourceReleased();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

project:
name: MasterIntegrationTest
code: 1
description: Project for testing timeout alert when warningGroupId is null
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

# Workflow WITHOUT warningGroupId (set to null/omitted)
workflows:
- name: workflow_with_timeout_warn_no_warning_group
code: 1
version: 1
projectCode: 1
description: Workflow with WARN timeout task but NO warningGroupId, should NOT send alert
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
userId: 1
executionType: PARALLEL
# warningGroupId is intentionally omitted (or set to null)

tasks:
- name: warn_task_no_alert_due_to_missing_warning_group
code: 1
version: 1
projectCode: 1
userId: 1
timeoutFlag: 'OPEN'
timeoutNotifyStrategy: 'WARN' # Should trigger alert IF warningGroupId exists
timeout: 1 # 1 minute
taskType: LogicFakeTask
taskParams: '{"localParams":[],"shellScript":"sleep 70","resourceList":[]}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH

taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

project:
name: MasterIntegrationTest
code: 1
description: This is a fake project for testing timeout warning alerts
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

# Workflow that contains a task configured with TIMEOUT + WARN strategy
workflows:
- name: workflow_with_timeout_warn_task
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with single timeout task using WARN strategy
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
userId: 1
executionType: PARALLEL
warningGroupId: 1 # Required for sending timeout alerts; must be non-null to trigger alert

# Task that will exceed its timeout limit to trigger a WARN alert (not kill)
tasks:
- name: warn_task_with_timeout_alert
code: 1
version: 1
projectCode: 1
userId: 1
timeoutFlag: 'OPEN' # Enable timeout mechanism
timeoutNotifyStrategy: 'WARN' # Only send alert on timeout (do NOT kill the task)
timeout: 1 # Timeout after 1 minute
taskType: LogicFakeTask
taskParams: '{"localParams":[],"shellScript":"sleep 70","resourceList":[]}' # Sleeps for 70s (> timeout), simulating a hanging task
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH

# Task dependency: this workflow starts with the timeout-warning task
taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Loading
Loading